kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2533: Create a member Metadata.Listener inside KafkaConsumer
Date Mon, 21 Sep 2015 19:04:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9ae98685e -> e18f6860c


KAFKA-2533: Create a member Metadata.Listener inside KafkaConsumer

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Guozhang Wang

Closes #220 from SinghAsDev/KAFKA-2533


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e18f6860
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e18f6860
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e18f6860

Branch: refs/heads/trunk
Commit: e18f6860c2b297cc2ce06f9f3cb37987af3bdbae
Parents: 9ae9868
Author: Ashish Singh <asingh@cloudera.com>
Authored: Mon Sep 21 12:07:56 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Sep 21 12:07:56 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 33 ++++++++++----------
 1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e18f6860/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3ac2be8..3b461b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -395,7 +395,7 @@ import java.util.regex.Pattern;
  *
  */
 @InterfaceStability.Unstable
-public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener
{
+public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
     private static final long NO_CURRENT_THREAD = -1L;
@@ -417,6 +417,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>,
Metadata.Listener {
     private final boolean autoCommit;
     private final long autoCommitIntervalMs;
     private boolean closed = false;
+    private Metadata.Listener metadataListener;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -698,9 +699,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>,
Metadata.Listener {
         acquire();
         try {
             log.debug("Subscribed to pattern: {}", pattern);
+            metadataListener = new Metadata.Listener() {
+                @Override
+                public void onMetadataUpdate(Cluster cluster) {
+                    final List<String> topicsToSubscribe = new ArrayList<>();
+
+                    for (String topic : cluster.topics())
+                        if (subscriptions.getSubscribedPattern().matcher(topic).matches())
+                            topicsToSubscribe.add(topic);
+
+                    subscriptions.changeSubscription(topicsToSubscribe);
+                    metadata.setTopics(topicsToSubscribe);
+                }
+            };
             this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
             this.metadata.needMetadataForAllTopics(true);
-            this.metadata.addListener(this);
+            this.metadata.addListener(metadataListener);
         } finally {
             release();
         }
@@ -714,7 +728,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>,
Metadata.Listener {
         try {
             this.subscriptions.unsubscribe();
             this.metadata.needMetadataForAllTopics(false);
-            this.metadata.removeListener(this);
+            this.metadata.removeListener(metadataListener);
         } finally {
             release();
         }
@@ -1212,17 +1226,4 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>,
Metadata.Listener {
         if (refcount.decrementAndGet() == 0)
             currentThread.set(NO_CURRENT_THREAD);
     }
-
-    @Override
-    public void onMetadataUpdate(Cluster cluster) {
-        final List<String> topicsToSubscribe = new ArrayList<>();
-
-        for (String topic : cluster.topics())
-            if (this.subscriptions.getSubscribedPattern().matcher(topic).matches())
-                topicsToSubscribe.add(topic);
-
-        subscriptions.changeSubscription(topicsToSubscribe);
-        metadata.setTopics(topicsToSubscribe);
-    }
-
 }


Mime
View raw message