kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) subscriptions
Date Mon, 04 Jul 2016 22:42:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 7b01f848a -> cdf019a82


KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to change the subscription
of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches a consumer's subscription
pattern would not be assigned to the consumer upon creation seems to be as designed. A repeat
`subscribe()` to the same pattern or some wait time until the next automatic metadata refresh
would handle that case.

An integration test was also added to verify these issues are fixed with this PR.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1572 from vahidhashemian/KAFKA-3854


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cdf019a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cdf019a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cdf019a8

Branch: refs/heads/0.10.0
Commit: cdf019a8249f95bb0080202b6f806a292a9dc8ef
Parents: 7b01f84
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Mon Jul 4 14:09:30 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jul 4 15:00:36 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  1 +
 .../consumer/internals/SubscriptionState.java   | 41 +++++++---
 .../kafka/api/PlaintextConsumerTest.scala       | 83 +++++++++++++++++++-
 3 files changed, 113 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cdf019a8/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2784644..9ee6c95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -853,6 +853,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             log.debug("Subscribed to pattern: {}", pattern);
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
+            this.metadata.requestUpdate();
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cdf019a8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec35115..2412d36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
@@ -47,6 +47,13 @@ import java.util.regex.Pattern;
  */
 public class SubscriptionState {
 
+    private enum SubscriptionType {
+        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
+    };
+
+    /* the type of subscription */
+    private SubscriptionType subscriptionType;
+
     /* the pattern user has requested */
     private Pattern subscribedPattern;
 
@@ -77,6 +84,19 @@ public class SubscriptionState {
     private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
         "Subscription to topics, partitions and pattern are mutually exclusive";
 
+    /**
+     * This method sets the subscription type if it is not already set (i.e. when it is NONE),
+     * or verifies that the subscription type is equal to the give type when it is set (i.e.
+     * when it is not NONE)
+     * @param type The given subscription type
+     */
+    private void setSubscriptionType(SubscriptionType type) {
+        if (this.subscriptionType == SubscriptionType.NONE)
+            this.subscriptionType = type;
+        else if (this.subscriptionType != type)
+            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
+    }
+
     public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = new HashSet<>();
@@ -86,14 +106,14 @@ public class SubscriptionState {
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to
fetch offset upon starting up
         this.subscribedPattern = null;
+        this.subscriptionType = SubscriptionType.NONE;
     }
 
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
{
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
-        if (!this.userAssignment.isEmpty() || this.subscribedPattern != null)
-            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
+        setSubscriptionType(SubscriptionType.AUTO_TOPICS);
 
         this.listener = listener;
 
@@ -122,7 +142,7 @@ public class SubscriptionState {
      * @param topics The topics to add to the group subscription
      */
     public void groupSubscribe(Collection<String> topics) {
-        if (!this.userAssignment.isEmpty())
+        if (this.subscriptionType == SubscriptionType.USER_ASSIGNED)
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
         this.groupSubscription.addAll(topics);
     }
@@ -138,8 +158,7 @@ public class SubscriptionState {
      * whose input partitions are provided from the subscribed topics.
      */
     public void assignFromUser(Collection<TopicPartition> partitions) {
-        if (!this.subscription.isEmpty() || this.subscribedPattern != null)
-            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
+        setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
         this.userAssignment.clear();
         this.userAssignment.addAll(partitions);
@@ -171,15 +190,14 @@ public class SubscriptionState {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
-        if (!this.subscription.isEmpty() || !this.userAssignment.isEmpty())
-            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
+        setSubscriptionType(SubscriptionType.AUTO_PATTERN);
 
         this.listener = listener;
         this.subscribedPattern = pattern;
     }
 
     public boolean hasPatternSubscription() {
-        return subscribedPattern != null;
+        return this.subscriptionType == SubscriptionType.AUTO_PATTERN;
     }
 
     public void unsubscribe() {
@@ -188,6 +206,7 @@ public class SubscriptionState {
         this.assignment.clear();
         this.needsPartitionAssignment = true;
         this.subscribedPattern = null;
+        this.subscriptionType = SubscriptionType.NONE;
     }
 
 
@@ -270,7 +289,7 @@ public class SubscriptionState {
     }
 
     public boolean partitionsAutoAssigned() {
-        return !this.subscription.isEmpty();
+        return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType
== SubscriptionType.AUTO_PATTERN;
     }
 
     public void position(TopicPartition tp, long offset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cdf019a8/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b22ccde..7db125a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -127,6 +127,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset
= 0)
   }
 
+  /**
+   * Verifies that pattern subscription performs as expected.
+   * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' or 'tblab1'.
+   * It is expected that the consumer is subscribed to all partitions of 'topic' and
+   * 'tblablac' after the subscription when metadata is refreshed.
+   * When a new topic 'tsomec' is added afterwards, it is expected that upon the next
+   * metadata refresh the consumer becomes subscribed to this new topic and all partitions
+   * of that topic are assigned to it.
+   */
   @Test
   def testPatternSubscription() {
     val numRecords = 10000
@@ -183,12 +192,84 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(0, this.consumers(0).assignment().size)
   }
 
+  /**
+   * Verifies that a second call to pattern subscription succeeds and performs as expected.
+   * The initial subscription is to a pattern that matches two topics 'topic' and 'foo'.
+   * The second subscription is to a pattern that matches 'foo' and a new topic 'bar'.
+   * It is expected that the consumer is subscribed to all partitions of 'topic' and 'foo'
after
+   * the first subscription, and to all partitions of 'foo' and 'bar' after the second.
+   * The metadata refresh interval is intentionally increased to a large enough value to
guarantee
+   * that it is the subscription call that triggers a metadata refresh, and not the timeout.
+   */
+  @Test
+  def testSubsequentPatternSubscription() {
+    this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+    consumers += consumer0
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    // the first topic ('topic')  matches first subscription pattern only
+
+    val fooTopic = "foo" // matches both subscription patterns
+    TestUtils.createTopic(this.zkUtils, fooTopic, 1, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(fooTopic, 0))
+
+    assertEquals(0, consumer0.assignment().size)
+
+    val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this 
+    consumer0.subscribe(pattern1, new TestConsumerReassignmentListener)
+    consumer0.poll(50)
+
+    var subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(fooTopic, 0))
+
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer0.assignment()}")
+
+    val barTopic = "bar" // matches the next subscription pattern
+    TestUtils.createTopic(this.zkUtils, barTopic, 1, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(barTopic, 0))
+
+    val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
+    consumer0.subscribe(pattern2, new TestConsumerReassignmentListener)
+    consumer0.poll(50)
+
+    subscriptions --= Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1))
+
+    subscriptions ++= Set(
+      new TopicPartition(barTopic, 0))
+
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer0.assignment()}")
+
+    consumer0.unsubscribe()
+    assertEquals(0, consumer0.assignment().size)
+  }
+
+  /**
+   * Verifies that pattern unsubscription performs as expected.
+   * The pattern matches the topics 'topic' and 'tblablac'.
+   * It is expected that the consumer is subscribed to all partitions of 'topic' and
+   * 'tblablac' after the subscription when metadata is refreshed.
+   * When consumer unsubscribes from all its subscriptions, it is expected that its
+   * assignments are cleared right away.
+   */
   @Test
   def testPatternUnsubscription() {
     val numRecords = 10000
     sendRecords(numRecords)
 
-    val topic1 = "tblablac" // matches subscribed pattern
+    val topic1 = "tblablac" // matches the subscription pattern
     TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
     sendRecords(1000, new TopicPartition(topic1, 0))
     sendRecords(1000, new TopicPartition(topic1, 1))


Mime
View raw message