kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-6005; Reject JoinGroup request from first member with empty protocol type/protocol list
Date Tue, 03 Oct 2017 15:40:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3dcbbf703 -> 42b356500


KAFKA-6005; Reject JoinGroup request from first member with empty protocol type/protocol list

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3957 from omkreddy/JOIN-GROUP-EMPTY-PROTOCOL


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42b35650
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42b35650
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42b35650

Branch: refs/heads/trunk
Commit: 42b356500b7188eb2507f9b48399d5491a7eff16
Parents: 3dcbbf7
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Tue Oct 3 08:37:30 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Oct 3 08:37:30 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 37 +++++++++++++++-----
 .../apache/kafka/common/protocol/Errors.java    |  3 +-
 .../clients/consumer/KafkaConsumerTest.java     | 33 +++++++++--------
 .../coordinator/group/GroupCoordinator.scala    |  3 ++
 .../group/GroupCoordinatorTest.scala            | 16 +++++++++
 5 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d6764ca..6fb6919 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -566,6 +566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
     private volatile boolean closed = false;
+    private List<PartitionAssignor> assignors;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -730,7 +731,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
-            List<PartitionAssignor> assignors = config.getConfiguredInstances(
+            this.assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
             this.coordinator = new ConsumerCoordinator(logContext,
@@ -797,7 +798,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                   SubscriptionState subscriptions,
                   Metadata metadata,
                   long retryBackoffMs,
-                  long requestTimeoutMs) {
+                  long requestTimeoutMs,
+                  List<PartitionAssignor> assignors) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
         this.coordinator = coordinator;
@@ -812,6 +814,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
+        this.assignors = assignors;
     }
 
     /**
@@ -874,7 +877,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *                 subscribed topics
      * @throws IllegalArgumentException If topics is null or contains null or empty elements,
or if listener is null
      * @throws IllegalStateException If {@code subscribe()} is called previously with pattern,
or assign is called
-     *                               previously (without a subsequent call to {@link #unsubscribe()})
+     *                               previously (without a subsequent call to {@link #unsubscribe()}),
or if not
+     *                               configured at-least one partition assignment strategy
      */
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
{
@@ -890,6 +894,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     if (topic == null || topic.trim().isEmpty())
                         throw new IllegalArgumentException("Topic collection to subscribe
to cannot contain null or empty topic");
                 }
+
+                throwIfNoAssignorsConfigured();
+
                 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
                 this.subscriptions.subscribe(new HashSet<>(topics), listener);
                 metadata.setTopics(subscriptions.groupSubscription());
@@ -917,7 +924,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @param topics The list of topics to subscribe to
      * @throws IllegalArgumentException If topics is null or contains null or empty elements
      * @throws IllegalStateException If {@code subscribe()} is called previously with pattern,
or assign is called
-     *                               previously (without a subsequent call to {@link #unsubscribe()})
+     *                               previously (without a subsequent call to {@link #unsubscribe()}),
or if not
+     *                               configured at-least one partition assignment strategy
      */
     @Override
     public void subscribe(Collection<String> topics) {
@@ -943,7 +951,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *                 subscribed topics
      * @throws IllegalArgumentException If pattern or listener is null
      * @throws IllegalStateException If {@code subscribe()} is called previously with topics,
or assign is called
-     *                               previously (without a subsequent call to {@link #unsubscribe()})
+     *                               previously (without a subsequent call to {@link #unsubscribe()}),
or if not
+     *                               configured at-least one partition assignment strategy
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -951,6 +960,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         try {
             if (pattern == null)
                 throw new IllegalArgumentException("Topic pattern to subscribe to cannot
be null");
+
+            throwIfNoAssignorsConfigured();
+
             log.debug("Subscribed to pattern: {}", pattern);
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
@@ -974,7 +986,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @param pattern Pattern to subscribe to
      * @throws IllegalArgumentException If pattern is null
      * @throws IllegalStateException If {@code subscribe()} is called previously with topics,
or assign is called
-     *                               previously (without a subsequent call to {@link #unsubscribe()})
+     *                               previously (without a subsequent call to {@link #unsubscribe()}),
or if not
+     *                               configured at-least one partition assignment strategy
      */
     @Override
     public void subscribe(Pattern pattern) {
@@ -1568,7 +1581,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @return a mapping from partition to the timestamp and offset of the first message
with timestamp greater
      *         than or equal to the target timestamp. {@code null} will be returned for the
partition if there is no
      *         such message.
-     * @throws AuthenticationException if authentication fails. See the exception for more
details
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails.
See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative.
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could
not be fetched before
      *         expiration of the configured request timeout
@@ -1672,7 +1685,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @param timeout The maximum time to wait for consumer to close gracefully. The value
must be
      *                non-negative. Specifying a timeout of zero means do not wait for pending
requests to complete.
      * @param timeUnit The time unit for the {@code timeout}
-     * @throws AuthenticationException if authentication fails. See the exception for more
details
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails.
See the exception for more details
      * @throws InterruptException If the thread is interrupted before or while this function
is called
      * @throws IllegalArgumentException If the {@code timeout} is negative.
      */
@@ -1742,7 +1755,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * or reset it using the offset reset policy the user has configured.
      *
      * @param partitions The partitions that needs updating fetch positions
-     * @throws AuthenticationException if authentication fails. See the exception for more
details
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails.
See the exception for more details
      * @throws NoOffsetForPartitionException If no offset is stored for a given partition
and no offset reset policy is
      *             defined
      */
@@ -1797,4 +1810,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         if (refcount.decrementAndGet() == 0)
             currentThread.set(NO_CURRENT_THREAD);
     }
+
+    private void throwIfNoAssignorsConfigured() {
+        if (assignors.isEmpty())
+            throw new IllegalStateException("Must configure at least one partition assigner
class name to " +
+                ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bea6050..d937054 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -266,7 +266,8 @@ public enum Errors {
             }
         }),
     INCONSISTENT_GROUP_PROTOCOL(23,
-            "The group member's supported protocols are incompatible with those of existing
members.",
+            "The group member's supported protocols are incompatible with those of existing
members" +
+                " or first group member tried to join with empty protocol type or empty protocol
list.",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c5e2213..632bec0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -235,12 +235,22 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSeekNegative() {
+    @Test(expected = IllegalStateException.class)
+    public void testSubscriptionWithEmptyPartitionAssignment() {
         Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");
+
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new
ByteArrayDeserializer(), new ByteArrayDeserializer());
+        try {
+            consumer.subscribe(singletonList(topic));
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSeekNegative() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
@@ -252,10 +262,6 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicPartition() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicPartition");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(null);
@@ -277,10 +283,6 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicInPartition() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicInPartition");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(Arrays.asList(new TopicPartition(null, 0)));
@@ -291,10 +293,6 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnEmptyTopicInPartition() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnEmptyTopicInPartition");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(Arrays.asList(new TopicPartition("  ", 0)));
@@ -1678,7 +1676,8 @@ public class KafkaConsumerTest {
                 subscriptions,
                 metadata,
                 retryBackoffMs,
-                requestTimeoutMs);
+                requestTimeoutMs,
+                assignors);
     }
 
     private static class FetchInfo {

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 42bc3c3..bb59bcd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -146,6 +146,9 @@ class GroupCoordinator(val brokerId: Int,
       if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet)))
{
         // if the new member does not support the group protocol, reject it
         responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+      } else if (group.is(Empty) && (protocols.isEmpty || protocolType.isEmpty))
{
+        //reject if first member with empty group protocol or protocolType is empty
+        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId))
{
         // if the member trying to register with a un-recognized id, send the response to
let
         // it reset its member id and retry

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 95abb33..85d72c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -186,6 +186,22 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testJoinGroupWithEmptyProtocolType() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, "", protocols)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
+  }
+
+  @Test
+  def testJoinGroupWithEmptyGroupProtocol() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, List())
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
+  }
+
+  @Test
   def testJoinGroupInconsistentGroupProtocol() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 


Mime
View raw message