kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe
Date Tue, 13 Sep 2016 03:31:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a7ab9cb83 -> 2a660f13d


KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe

Here is the patch on github ijuma.

Acquiring the consumer lock (the single thread access controls) requires that the consumer
be open. I changed the closed variable to be volatile so that another thread's writes will
visible to the reading thread.

Additionally, there was an additional check if the consumer was closed after the lock was
acquired. This check is no longer necessary.

This is my original work and I license it to the project under the project's open source license.

Author: Tim Brooks <tim@uncontended.net>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1637 from tbrooks8/KAFKA-2311


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a660f13
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a660f13
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a660f13

Branch: refs/heads/trunk
Commit: 2a660f13d2a805f0e27351996e904b1ea2365eba
Parents: a7ab9cb
Author: Tim Brooks <tim@uncontended.net>
Authored: Mon Sep 12 20:28:01 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Sep 12 20:28:01 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a660f13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ade4243..cfa046f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -508,7 +508,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
-    private boolean closed = false;
+    private volatile boolean closed = false;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -1397,7 +1397,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void close() {
         acquire();
         try {
-            if (closed) return;
             close(false);
         } finally {
             release();


Mime
View raw message