kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2648: enforce non-empty group-ids in join-group request
Date Wed, 28 Oct 2015 21:08:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 983b1f9e1 -> 443cd9ab0


KAFKA-2648: enforce non-empty group-ids in join-group request

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #362 from hachikuji/KAFKA-2648


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/443cd9ab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/443cd9ab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/443cd9ab

Branch: refs/heads/trunk
Commit: 443cd9ab0db5223b7c85ac8333f2fc1a0e554870
Parents: 983b1f9
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Oct 28 14:14:11 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 28 14:14:11 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java    |  8 ++++----
 .../consumer/internals/AbstractCoordinator.java  |  3 ++-
 .../org/apache/kafka/common/protocol/Errors.java |  2 ++
 .../internals/ConsumerCoordinatorTest.java       | 19 +++++++++++++++++++
 .../kafka/coordinator/GroupCoordinator.scala     |  6 ++++++
 .../GroupCoordinatorResponseTest.scala           | 13 +++++++++++++
 6 files changed, 46 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/443cd9ab/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7aef8a3..bc9ef21 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -15,8 +15,8 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -565,7 +565,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                 this.valueDeserializer = valueDeserializer;
             }
-            this.fetcher = new Fetcher<K, V>(this.client,
+            this.fetcher = new Fetcher<>(this.client,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
@@ -774,9 +774,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *            immediately with any records available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics
and partitions
      *
-     * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed
partition and no automatic
+     * @throws NoOffsetForPartitionException if there is no stored offset for a subscribed
partition and no automatic
      *             offset reset policy has been configured.
-     * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange
error in fetchResponse and
+     * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange
error in fetchResponse and
      *         the defaultResetPolicy is NONE
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/443cd9ab/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 8d5ee16..4b2a824 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -365,7 +365,8 @@ public abstract class AbstractCoordinator {
                         groupId);
                 future.raise(Errors.forCode(errorCode));
             } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
-                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
+                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()
+                    || errorCode == Errors.INVALID_GROUP_ID.code()) {
                 // log the error and re-throw the exception
                 Errors error = Errors.forCode(errorCode);
                 log.error("Attempt to join group {} failed due to: {}",

http://git-wip-us.apache.org/repos/asf/kafka/blob/443cd9ab/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 9184d11..bc607f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -78,6 +78,8 @@ public enum Errors {
             new IllegalGenerationException("Specified group generation id is not valid.")),
     INCONSISTENT_GROUP_PROTOCOL(23,
             new ApiException("The group member's supported protocols are incompatible with
those of existing members.")),
+    INVALID_GROUP_ID(24,
+            new ApiException("The configured groupId is invalid")),
     UNKNOWN_MEMBER_ID(25,
             new UnknownMemberIdException("The coordinator is not aware of this member.")),
     INVALID_SESSION_TIMEOUT(26,

http://git-wip-us.apache.org/repos/asf/kafka/blob/443cd9ab/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a0baccd..963da42 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -255,6 +255,25 @@ public class ConsumerCoordinatorTest {
         assertTrue(coordinator.coordinatorUnknown());
     }
 
+    @Test(expected = ApiException.class)
+    public void testJoinGroupInvalidGroupId() {
+        final String consumerId = "leader";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(Arrays.asList(topicName));
+        metadata.update(cluster, time.milliseconds());
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String,
List<String>>emptyMap(),
+                Errors.INVALID_GROUP_ID.code()));
+        coordinator.ensurePartitionAssignment();
+    }
+
     @Test
     public void testNormalJoinGroupLeader() {
         final String consumerId = "leader";

http://git-wip-us.apache.org/repos/asf/kafka/blob/443cd9ab/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index ef94289..0ef542d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -115,6 +115,8 @@ class GroupCoordinator(val brokerId: Int,
                       responseCallback: JoinCallback) {
     if (!isActive.get) {
       responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+    } else if (!validGroupId(groupId)) {
+      responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))
     } else if (!isCoordinatorForGroup(groupId)) {
       responseCallback(joinError(memberId,Errors.NOT_COORDINATOR_FOR_GROUP.code))
     } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
@@ -409,6 +411,10 @@ class GroupCoordinator(val brokerId: Int,
     offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
   }
 
+  private def validGroupId(groupId: String): Boolean = {
+    groupId != null && !groupId.isEmpty
+  }
+
   private def joinError(memberId: String, errorCode: Short): JoinGroupResult = {
     JoinGroupResult(
       members=Map.empty,

http://git-wip-us.apache.org/repos/asf/kafka/blob/443cd9ab/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index cdd78ef..5eaaea8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -126,6 +126,19 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testInvalidGroupId() {
+    val groupId = ""
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
+      protocols, isCoordinatorForGroup = true)
+    assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
+  }
+
+  @Test
   def testValidJoinGroup() {
     val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID


Mime
View raw message