kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1893: Allow regex subscriptions in the new consumer
Date Thu, 10 Sep 2015 20:11:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b8b1bca44 -> fd1239658


KAFKA-1893: Allow regex subscriptions in the new consumer

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Jason Gustafson, Guozhang Wang, Edward Ribeiro, Ismael Juma

Closes #128 from SinghAsDev/KAFKA-1893


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd123965
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd123965
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd123965

Branch: refs/heads/trunk
Commit: fd123965802f900fa474a8d5b1539e62d28bff73
Parents: b8b1bca
Author: Ashish Singh <asingh@cloudera.com>
Authored: Thu Sep 10 13:14:24 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Sep 10 13:14:24 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java | 70 +++++++++++++--
 .../org/apache/kafka/clients/NetworkClient.java |  3 +-
 .../apache/kafka/clients/consumer/Consumer.java | 11 +++
 .../kafka/clients/consumer/KafkaConsumer.java   | 61 ++++++++++++-
 .../kafka/clients/consumer/MockConsumer.java    | 21 +++++
 .../clients/consumer/internals/Fetcher.java     | 19 ++--
 .../consumer/internals/SubscriptionState.java   | 47 ++++++++--
 .../org/apache/kafka/clients/MetadataTest.java  | 88 +++++++++++++++++-
 .../internals/SubscriptionStateTest.java        | 69 +++++++++++++++
 .../integration/kafka/api/ConsumerTest.scala    | 93 +++++++++++++++++++-
 10 files changed, 458 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 3f3a5f5..7d4ffa7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,15 +12,17 @@
  */
 package org.apache.kafka.clients;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -41,6 +43,8 @@ public final class Metadata {
     private Cluster cluster;
     private boolean needUpdate;
     private final Set<String> topics;
+    private final List<Listener> listeners;
+    private boolean needMetadataForAllTopics;
 
     /**
      * Create a metadata instance with reasonable defaults
@@ -64,6 +68,8 @@ public final class Metadata {
         this.cluster = Cluster.empty();
         this.needUpdate = false;
         this.topics = new HashSet<String>();
+        this.listeners = new ArrayList<>();
+        this.needMetadataForAllTopics = false;
     }
 
     /**
@@ -153,11 +159,17 @@ public final class Metadata {
         this.lastRefreshMs = now;
         this.lastSuccessfulRefreshMs = now;
         this.version += 1;
-        this.cluster = cluster;
+
+        for (Listener listener: listeners)
+            listener.onMetadataUpdate(cluster);
+
+        // Do this after notifying listeners as subscribed topics' list can be changed by
listeners
+        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster)
: cluster;
+
         notifyAll();
         log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
     }
-    
+
     /**
      * Record an attempt to update the metadata that failed. We need to keep track of this
      * to avoid retrying immediately.
@@ -186,4 +198,50 @@ public final class Metadata {
     public long refreshBackoff() {
         return refreshBackoffMs;
     }
+
+    /**
+     * Set state to indicate if metadata for all topics in Kafka cluster is required or not.
+     * @param needMetadaForAllTopics boolean indicating need for metadata of all topics in
cluster.
+     */
+    public void needMetadataForAllTopics(boolean needMetadaForAllTopics) {
+        this.needMetadataForAllTopics = needMetadaForAllTopics;
+    }
+
+    /**
+     * Get whether metadata for all topics is needed or not
+     */
+    public boolean needMetadataForAllTopics() {
+        return this.needMetadataForAllTopics;
+    }
+
+    /**
+     * Add a Metadata listener that gets notified of metadata updates
+     */
+    public void addListener(Listener listener) {
+        this.listeners.add(listener);
+    }
+
+    /**
+     * Stop notifying the listener of metadata updates
+     */
+    public void removeListener(Listener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * MetadataUpdate Listener
+     */
+    public interface Listener {
+        void onMetadataUpdate(Cluster cluster);
+    }
+
+    private Cluster getClusterForCurrentTopics(Cluster cluster) {
+        Collection<PartitionInfo> partitionInfos = new ArrayList<>();
+        if (cluster != null) {
+            for (String topic : this.topics) {
+                partitionInfos.addAll(cluster.partitionsForTopic(topic));
+            }
+        }
+        return new Cluster(cluster.nodes(), partitionInfos);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 049b22e..1302f35 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -15,6 +15,7 @@ package org.apache.kafka.clients;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -598,7 +599,7 @@ public class NetworkClient implements KafkaClient {
             String nodeConnectionId = node.idString();
 
             if (canSendRequest(nodeConnectionId)) {
-                Set<String> topics = metadata.topics();
+                Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>()
: metadata.topics();
                 this.metadataFetchInProgress = true;
                 ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 509918c..54e8869 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -16,6 +16,7 @@ import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
@@ -56,6 +57,16 @@ public interface Consumer<K, V> extends Closeable {
     public void assign(List<TopicPartition> partitions);
 
     /**
+    * @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
+    */
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
+
+    /**
+     * @see KafkaConsumer#unsubscribe()
+     */
+    public void unsubscribe();
+
+    /**
      * @see KafkaConsumer#poll(long)
      */
     public ConsumerRecords<K, V> poll(long timeout);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 19ef6eb..5763bac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -57,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
 
 /**
  * A Kafka client that consumes records from a Kafka cluster.
@@ -393,7 +395,7 @@ import java.util.concurrent.atomic.AtomicReference;
  *
  */
 @InterfaceStability.Unstable
-public class KafkaConsumer<K, V> implements Consumer<K, V> {
+public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener
{
 
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
     private static final long NO_CURRENT_THREAD = -1L;
@@ -674,6 +676,50 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     }
 
     /**
+     * Subscribes to topics matching specified pattern and uses the consumer's group
+     * management functionality. The pattern matching will be done periodically against topics
+     * existing at the time of check.
+     * <p>
+     * As part of group management, the consumer will keep track of the list of consumers
that
+     * belong to a particular group and will trigger a rebalance operation if one of the
+     * following events trigger -
+     * <ul>
+     * <li>Number of partitions change for any of the subscribed list of topics
+     * <li>Topic is created or deleted
+     * <li>An existing member of the consumer group dies
+     * <li>A new member is added to an existing consumer group via the join API
+     * </ul>
+     *
+     * @param pattern Pattern to subscribe to
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+        acquire();
+        try {
+            log.debug("Subscribed to pattern: {}", pattern);
+            this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
+            this.metadata.needMetadataForAllTopics(true);
+            this.metadata.addListener(this);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Unsubscribe from topics currently subscribed to
+     */
+    public void unsubscribe() {
+        acquire();
+        try {
+            this.subscriptions.unsubscribe();
+            this.metadata.needMetadataForAllTopics(false);
+            this.metadata.removeListener(this);
+        } finally {
+            release();
+        }
+    }
+
+    /**
      * Assign a list of partition to this consumer. This interface does not allow for incremental
assignment
      * and will replace the previous assignment (if there is one).
      * <p>
@@ -1156,4 +1202,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         if (refcount.decrementAndGet() == 0)
             currentThread.set(NO_CURRENT_THREAD);
     }
+
+    @Override
+    public void onMetadataUpdate(Cluster cluster) {
+        final List<String> topicsToSubscribe = new ArrayList<>();
+
+        for (String topic : cluster.topics())
+            if (this.subscriptions.getSubscribedPattern().matcher(topic).matches())
+                topicsToSubscribe.add(topic);
+
+        subscriptions.changeSubscription(topicsToSubscribe);
+        metadata.setTopics(topicsToSubscribe);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index e33f120..7e038f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
This class is <i> not
@@ -64,6 +65,20 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
+        ensureNotClosed();
+        this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
+        List<String> topicsToSubscribe = new ArrayList<>();
+        for (String topic: partitions.keySet()) {
+            if (pattern.matcher(topic).matches() &&
+                !subscriptions.subscription().contains(topic))
+                topicsToSubscribe.add(topic);
+        }
+        ensureNotClosed();
+        this.subscriptions.changeSubscription(topicsToSubscribe);
+    }
+
+    @Override
     public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener
listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
@@ -76,6 +91,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public void unsubscribe() {
+        ensureNotClosed();
+        subscriptions.unsubscribe();
+    }
+
+    @Override
     public synchronized ConsumerRecords<K, V> poll(long timeout) {
         ensureNotClosed();
         // update the consumed offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 1ae6d03..a797c79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -174,12 +174,8 @@ public class Fetcher<K, V> {
         long startTime = time.milliseconds();
 
         while (time.milliseconds() - startTime < timeout) {
-            final Node node = client.leastLoadedNode();
-            if (node != null) {
-                MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
-                final RequestFuture<ClientResponse> requestFuture =
-                    client.send(node, ApiKeys.METADATA, metadataRequest);
-
+            RequestFuture<ClientResponse> requestFuture = sendMetadataRequest();
+            if (requestFuture != null) {
                 client.poll(requestFuture);
 
                 if (requestFuture.succeeded()) {
@@ -204,6 +200,17 @@ public class Fetcher<K, V> {
     }
 
     /**
+     * Send Metadata Request to least loaded node in Kafka cluster asynchronously
+     * @return A future that indicates result of sent metadata request
+     */
+    public RequestFuture<ClientResponse> sendMetadataRequest() {
+        final Node node = client.leastLoadedNode();
+        return node == null ? null :
+            client.send(
+                node, ApiKeys.METADATA, new MetadataRequest(Collections.<String>emptyList()));
+    }
+
+    /**
      * Reset offsets for the given partition using the offset reset strategy.
      *
      * @param partition The given partition that needs reset offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec6b424..c92a581 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
@@ -47,6 +48,9 @@ import java.util.Set;
  */
 public class SubscriptionState {
 
+    /* the pattern user has requested */
+    private Pattern subscribedPattern;
+
     /* the list of topics the user has requested */
     private final Set<String> subscription;
 
@@ -67,6 +71,8 @@ public class SubscriptionState {
 
     /* Listener to be invoked when assignment changes */
     private RebalanceListener listener;
+    private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
+        "Subscription to topics, partitions and pattern are mutually exclusive";
 
     public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
         this.defaultResetStrategy = defaultResetStrategy;
@@ -75,20 +81,25 @@ public class SubscriptionState {
         this.assignment = new HashMap<>();
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to
fetch offset upon starting up
+        this.subscribedPattern = null;
     }
 
     public void subscribe(List<String> topics, RebalanceListener listener) {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
-        if (!this.userAssignment.isEmpty())
-            throw new IllegalStateException("Subscription to topics and partitions are mutually
exclusive");
+        if (!this.userAssignment.isEmpty() || this.subscribedPattern != null)
+            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
 
         this.listener = listener;
 
-        if (!this.subscription.equals(new HashSet<>(topics))) {
+        changeSubscription(topics);
+    }
+
+    public void changeSubscription(List<String> topicsToSubscribe) {
+        if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
             this.subscription.clear();
-            this.subscription.addAll(topics);
+            this.subscription.addAll(topicsToSubscribe);
             this.needsPartitionAssignment = true;
 
             // Remove any assigned partitions which are no longer subscribed to
@@ -98,6 +109,7 @@ public class SubscriptionState {
                     it.remove();
             }
         }
+
     }
 
     public void needReassignment() {
@@ -105,8 +117,8 @@ public class SubscriptionState {
     }
 
     public void assign(List<TopicPartition> partitions) {
-        if (!this.subscription.isEmpty())
-            throw new IllegalStateException("Subscription to topics and partitions are mutually
exclusive");
+        if (!this.subscription.isEmpty() || this.subscribedPattern != null)
+            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
 
         this.userAssignment.clear();
         this.userAssignment.addAll(partitions);
@@ -118,6 +130,29 @@ public class SubscriptionState {
         this.assignment.keySet().retainAll(this.userAssignment);
     }
 
+    public void subscribe(Pattern pattern, RebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+        if (!this.subscription.isEmpty() || !this.userAssignment.isEmpty())
+            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
+
+        this.listener = listener;
+        this.subscribedPattern = pattern;
+    }
+
+    public void unsubscribe() {
+        this.subscription.clear();
+        this.assignment.clear();
+        this.needsPartitionAssignment = true;
+        this.subscribedPattern = null;
+    }
+
+
+    public Pattern getSubscribedPattern() {
+        return this.subscribedPattern;
+    }
+
     public void clearAssignment() {
         this.assignment.clear();
         this.needsPartitionAssignment = !subscription().isEmpty();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5fe8821..c42c7bc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -12,9 +12,16 @@
  */
 package org.apache.kafka.clients;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -28,7 +35,7 @@ public class MetadataTest {
     private long metadataExpireMs = 1000;
     private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
     private AtomicReference<String> backgroundError = new AtomicReference<String>();
-    
+
     @After
     public void tearDown() {
         assertNull("Exception in background thread : " + backgroundError.get(), backgroundError.get());
@@ -106,6 +113,85 @@ public class MetadataTest {
 
     }
 
+    @Test
+    public void testUpdateWithNeedMetadataForAllTopics() {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        metadata.needMetadataForAllTopics(true);
+
+        final List<String> expectedTopics = Collections.singletonList("topic");
+        metadata.setTopics(expectedTopics);
+        metadata.update(new Cluster(
+                Collections.singletonList(new Node(0, "host1", 1000)),
+                Arrays.asList(
+                    new PartitionInfo("topic", 0, null, null, null),
+                    new PartitionInfo("topic1", 0, null, null, null))),
+            100);
+
+        assertArrayEquals("Metadata got updated with wrong set of topics.",
+            expectedTopics.toArray(), metadata.topics().toArray());
+
+        metadata.needMetadataForAllTopics(false);
+    }
+
+    @Test
+    public void testListenerGetsNotifiedOfUpdate() {
+        long time = 0;
+        final Set<String> topics = new HashSet<>();
+        metadata.update(Cluster.empty(), time);
+        metadata.addListener(new Metadata.Listener() {
+            @Override
+            public void onMetadataUpdate(Cluster cluster) {
+                topics.clear();
+                topics.addAll(cluster.topics());
+            }
+        });
+
+        metadata.update(new Cluster(
+                Arrays.asList(new Node(0, "host1", 1000)),
+                Arrays.asList(
+                    new PartitionInfo("topic", 0, null, null, null),
+                    new PartitionInfo("topic1", 0, null, null, null))),
+            100);
+
+        assertEquals("Listener did not update topics list correctly",
+            new HashSet<>(Arrays.asList("topic", "topic1")), topics);
+    }
+
+    @Test
+    public void testListenerCanUnregister() {
+        long time = 0;
+        final Set<String> topics = new HashSet<>();
+        metadata.update(Cluster.empty(), time);
+        final Metadata.Listener listener = new Metadata.Listener() {
+            @Override
+            public void onMetadataUpdate(Cluster cluster) {
+                topics.clear();
+                topics.addAll(cluster.topics());
+            }
+        };
+        metadata.addListener(listener);
+
+        metadata.update(new Cluster(
+                Collections.singletonList(new Node(0, "host1", 1000)),
+                Arrays.asList(
+                    new PartitionInfo("topic", 0, null, null, null),
+                    new PartitionInfo("topic1", 0, null, null, null))),
+            100);
+
+        metadata.removeListener(listener);
+
+        metadata.update(new Cluster(
+                Arrays.asList(new Node(0, "host1", 1000)),
+                Arrays.asList(
+                    new PartitionInfo("topic2", 0, null, null, null),
+                    new PartitionInfo("topic3", 0, null, null, null))),
+            100);
+
+        assertEquals("Listener did not update topics list correctly",
+            new HashSet<>(Arrays.asList("topic", "topic1")), topics);
+    }
+
 
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index e4830b1..eb2c570 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertTrue;
 import static java.util.Arrays.asList;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.regex.Pattern;
 
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
@@ -34,6 +36,8 @@ public class SubscriptionStateTest {
     private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private final TopicPartition tp0 = new TopicPartition("test", 0);
     private final TopicPartition tp1 = new TopicPartition("test", 1);
+    private final MockSubscriptionListener mockSubscriptionListener = new
+        MockSubscriptionListener();
 
     @Test
     public void partitionAssignment() {
@@ -144,5 +148,70 @@ public class SubscriptionStateTest {
         assertEquals(offset, state.fetched(tp));
         assertEquals(offset, state.consumed(tp));
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void cantSubscribeTopicAndPattern() {
+        state.subscribe(Arrays.asList("test"), mockSubscriptionListener);
+        state.subscribe(Pattern.compile(".*"), listener);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void cantSubscribePartitionAndPattern() {
+        state.assign(Arrays.asList(new TopicPartition("test", 0)));
+        state.subscribe(Pattern.compile(".*"), listener);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void cantSubscribePatternAndTopic() {
+        state.subscribe(Pattern.compile(".*"), listener);
+        state.subscribe(Arrays.asList("test"), mockSubscriptionListener);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void cantSubscribePatternAndPartition() {
+        state.subscribe(Pattern.compile(".*"), listener);
+        state.assign(Arrays.asList(new TopicPartition("test", 0)));
+    }
+
+    @Test
+    public void patternSubscription() {
+        state.subscribe(Pattern.compile(".*"), listener);
+        state.changeSubscription(Arrays.asList("test", "test1"));
+
+        assertEquals(
+            "Expected subscribed topics count is incorrect", 2, state.subscription().size());
+    }
+
+    @Test
+    public void patternUnsubscription() {
+        state.subscribe(Pattern.compile(".*"), listener);
+        state.changeSubscription(Arrays.asList("test", "test1"));
+
+        state.unsubscribe();
+
+        assertEquals(
+            "Expected subscribed topics count is incorrect", 0, state.subscription().size());
+    }
+
+    private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener
{
+        public Collection<TopicPartition> revoked;
+        public Collection<TopicPartition> assigned;
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            this.assigned = partitions;
+            assignedCount++;
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            this.revoked = partitions;
+            revokedCount++;
+        }
+
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd123965/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index c1e5d02..b393692 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -12,11 +12,11 @@
  */
 package kafka.api
 
-import java.{util, lang}
+import java.util.regex.Pattern
+import java.{lang, util}
 
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.TopicPartition
 
@@ -57,6 +57,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
 
   @Before
   override def setUp() {
@@ -89,6 +90,92 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testPatternSubscription() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    val topic1: String = "tblablac" // matches subscribed pattern
+    TestUtils.createTopic(this.zkClient, topic1, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic1, 0))
+    sendRecords(1000, new TopicPartition(topic1, 1))
+
+    val topic2: String = "tblablak" // does not match subscribed pattern
+    TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic2, 0))
+    sendRecords(1000, new TopicPartition(topic2, 1))
+
+    val topic3: String = "tblab1" // does not match subscribed pattern
+    TestUtils.createTopic(this.zkClient, topic3, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic3, 0))
+    sendRecords(1000, new TopicPartition(topic3, 1))
+
+    assertEquals(0, this.consumers(0).assignment().size)
+
+    val pattern: Pattern = Pattern.compile("t.*c")
+    this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
+    this.consumers(0).poll(50)
+
+    var subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+    val topic4: String = "tsomec" // matches subscribed pattern
+    TestUtils.createTopic(this.zkClient, topic4, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic4, 0))
+    sendRecords(1000, new TopicPartition(topic4, 1))
+
+    subscriptions = subscriptions ++ Set(
+      new TopicPartition(topic4, 0),
+      new TopicPartition(topic4, 1))
+
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+    this.consumers(0).unsubscribe()
+    assertEquals(0, this.consumers(0).assignment().size)
+  }
+
+  @Test
+  def testPatternUnsubscription() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    val topic1: String = "tblablac" // matches subscribed pattern
+    TestUtils.createTopic(this.zkClient, topic1, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic1, 0))
+    sendRecords(1000, new TopicPartition(topic1, 1))
+
+    assertEquals(0, this.consumers(0).assignment().size)
+
+    this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
+    this.consumers(0).poll(50)
+
+    val subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+
+    this.consumers(0).unsubscribe()
+    assertEquals(0, this.consumers(0).assignment().size)
+  }
+
+  @Test
   def testCommitSpecifiedOffsets() {
     sendRecords(5, tp)
     sendRecords(7, tp2)


Mime
View raw message