From commits-return-2417-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Wed Jul 1 22:29:18 2015 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4329518195 for ; Wed, 1 Jul 2015 22:29:18 +0000 (UTC) Received: (qmail 90067 invoked by uid 500); 1 Jul 2015 22:29:18 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 90041 invoked by uid 500); 1 Jul 2015 22:29:18 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 90032 invoked by uid 99); 1 Jul 2015 22:29:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jul 2015 22:29:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 031AFE00E0; Wed, 1 Jul 2015 22:29:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-2168: minor follow-up patch; reviewed by Guozhang Wang Date: Wed, 1 Jul 2015 22:29:18 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 9ff5b27bc -> 14e0ce0a4 KAFKA-2168: minor follow-up patch; reviewed by Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14e0ce0a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14e0ce0a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14e0ce0a Branch: refs/heads/trunk Commit: 14e0ce0a47fb7f6ae6dab085b2ea9d5a1f644433 Parents: 9ff5b27 Author: Jason Gustafson Authored: Wed Jul 1 15:28:11 2015 -0700 Committer: Guozhang Wang Committed: Wed Jul 1 15:28:11 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 17 ++++++++++------- .../clients/consumer/internals/Coordinator.java | 10 +++++----- .../integration/kafka/api/ConsumerBounceTest.scala | 14 ++++++-------- 3 files changed, 21 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9be8fbc..1f0e515 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -332,6 +333,7 @@ import static org.apache.kafka.common.utils.Utils.min; * } * } * + * // Shutdown hook which can be called from a separate thread * public void shutdown() { * closed.set(true); * consumer.wakeup(); @@ -417,7 +419,7 @@ public class KafkaConsumer implements Consumer { // and is used to prevent multi-threaded access private final AtomicReference currentThread = new AtomicReference(); // refcount is used to allow reentrant access by the thread who has acquired currentThread - private int refcount = 0; // reference count for reentrant access + private final AtomicInteger refcount = new AtomicInteger(0); // TODO: This timeout controls how long we should wait before retrying a request. We should be able // to leverage the work of KAFKA-2120 to get this value from configuration. @@ -795,7 +797,7 @@ public class KafkaConsumer implements Consumer { * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. *

- * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. + * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails. * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until * the commit succeeds. * @@ -832,7 +834,9 @@ public class KafkaConsumer implements Consumer { public void commit(CommitType commitType) { acquire(); try { - commit(this.subscriptions.allConsumed(), commitType); + // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance) + Map allConsumed = new HashMap(this.subscriptions.allConsumed()); + commit(allConsumed, commitType); } finally { release(); } @@ -978,10 +982,9 @@ public class KafkaConsumer implements Consumer { @Override public void close() { - if (closed) return; - acquire(); try { + if (closed) return; close(false); } finally { release(); @@ -1355,14 +1358,14 @@ public class KafkaConsumer implements Consumer { Long threadId = Thread.currentThread().getId(); if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); - refcount++; + refcount.incrementAndGet(); } /** * Release the light lock protecting the consumer from multi-threaded access. */ private void release() { - if (--refcount == 0) + if (refcount.decrementAndGet() == 0) currentThread.set(null); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 6c26667..68b4cb1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -217,7 +217,7 @@ public final class Coordinator { OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); - RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future); + RequestCompletionHandler handler = new OffsetCommitCompletionHandler(offsets, future); sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now); } @@ -261,14 +261,14 @@ public final class Coordinator { RequestCompletionHandler completionHandler = new RequestCompletionHandler() { @Override public void onComplete(ClientResponse resp) { - handleOffsetResponse(resp, future); + handleOffsetFetchResponse(resp, future); } }; sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now); return future; } - private void handleOffsetResponse(ClientResponse resp, RequestFuture> future) { + private void handleOffsetFetchResponse(ClientResponse resp, RequestFuture> future) { if (resp.wasDisconnected()) { handleCoordinatorDisconnect(resp); future.retryWithNewCoordinator(); @@ -471,12 +471,12 @@ public final class Coordinator { } } - private class CommitOffsetCompletionHandler implements RequestCompletionHandler { + private class OffsetCommitCompletionHandler implements RequestCompletionHandler { private final Map offsets; private final RequestFuture future; - public CommitOffsetCompletionHandler(Map offsets, RequestFuture future) { + public OffsetCommitCompletionHandler(Map offsets, RequestFuture future) { this.offsets = offsets; this.future = future; } http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index f56096b..b0750fa 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -14,14 +14,10 @@ package kafka.api import kafka.server.KafkaConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.CommitType +import kafka.utils.{Logging, ShutdownableThread, TestUtils} +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition - -import kafka.utils.{ShutdownableThread, TestUtils, Logging} - import org.junit.Assert._ import scala.collection.JavaConversions._ @@ -85,9 +81,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { assertEquals(consumed.toLong, record.offset()) consumed += 1 } + consumer.commit(CommitType.SYNC) + assertEquals(consumer.position(tp), consumer.committed(tp)) - if (consumed == numRecords) { + if (consumer.position(tp) == numRecords) { consumer.seekToBeginning() consumed = 0 }