kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2596: reject commits from unknown groups with positive generations
Date Fri, 09 Oct 2015 06:35:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5a921a323 -> 6a06e22ef


KAFKA-2596: reject commits from unknown groups with positive generations

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Onur Karaman, Guozhang Wang

Closes #267 from hachikuji/KAFKA-2596


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6a06e22e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6a06e22e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6a06e22e

Branch: refs/heads/trunk
Commit: 6a06e22ef81aae3335ca59c94cb0fc33e9f4ceff
Parents: 5a921a3
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Oct 8 23:39:17 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 8 23:39:17 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  1 +
 .../clients/consumer/internals/Coordinator.java | 10 +++-
 .../consumer/internals/CoordinatorTest.java     | 34 ++++++++++++
 .../kafka/coordinator/ConsumerCoordinator.scala | 11 ++--
 .../ConsumerCoordinatorResponseTest.scala       | 56 ++++++++++++++++++--
 5 files changed, 104 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6a06e22e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a0d04bc..ceba667 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,6 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         acquire();
         try {
             this.subscriptions.unsubscribe();
+            this.coordinator.resetGeneration();
             this.metadata.needMetadataForAllTopics(false);
             this.metadata.removeListener(metadataListener);
         } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6a06e22e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 8326549..701a81b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -101,7 +101,7 @@ public final class Coordinator implements Closeable {
                        long autoCommitIntervalMs) {
         this.client = client;
         this.time = time;
-        this.generation = -1;
+        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
         this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
         this.groupId = groupId;
         this.consumerCoordinator = null;
@@ -438,6 +438,14 @@ public final class Coordinator implements Closeable {
     }
 
     /**
+     * Reset the generation/consumerId tracked by this consumer.
+     */
+    public void resetGeneration() {
+        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+        this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+    }
+
+    /**
      * Commit offsets for the specified list of topics and partitions. This is a non-blocking
call
      * which returns a request future that can be polled in the case of a synchronous commit
or ignored in the
      * asynchronous case.

http://git-wip-us.apache.org/repos/asf/kafka/blob/6a06e22e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 8e3c98e..66b2e32 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -39,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.ConsumerMetadataResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.utils.MockTime;
@@ -361,6 +363,38 @@ public class CoordinatorTest {
     }
 
     @Test
+    public void testResetGeneration() {
+        // enable auto-assignment
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp),
Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        // now switch to manual assignment
+        subscriptions.unsubscribe();
+        coordinator.resetGeneration();
+        subscriptions.assign(Arrays.asList(tp));
+
+        // the client should not reuse generation/consumerId from auto-subscribed generation
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+                return commitRequest.consumerId().equals(OffsetCommitRequest.DEFAULT_CONSUMER_ID)
&&
+                        commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
+            }
+        }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
callback(success));
+        consumerClient.poll(0);
+        assertTrue(success.get());
+    }
+
+    @Test
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/6a06e22e/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 64e21c5..2cdab85 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -236,10 +236,13 @@ class ConsumerCoordinator(val brokerId: Int,
     } else {
       val group = coordinatorMetadata.getGroup(groupId)
       if (group == null) {
-        // if the group does not exist, it means this group is not relying
-        // on Kafka for partition management, and hence never send join-group
-        // request to the coordinator before; in this case blindly commit the offsets
-        offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
+        if (generationId < 0)
+          // the group is not relying on Kafka for partition management, so allow the commit
+          offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
+        else
+          // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
+          // or this is a request coming from an older generation. either way, reject the
commit
+          responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
       } else {
         group synchronized {
           if (group.is(Dead)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6a06e22e/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 07f7326..3e763c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -21,12 +21,12 @@ package kafka.coordinator
 import java.util.concurrent.TimeUnit
 
 import org.junit.Assert._
-import kafka.common.TopicAndPartition
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 import kafka.server.{OffsetManager, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
-import org.easymock.EasyMock
+import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
+import org.easymock.{IAnswer, EasyMock}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
@@ -41,6 +41,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   type JoinGroupCallback = (Set[TopicAndPartition], String, Int, Short) => Unit
   type HeartbeatCallbackParams = Short
   type HeartbeatCallback = Short => Unit
+  type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
+  type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
 
   val ConsumerMinSessionTimeout = 10
   val ConsumerMaxSessionTimeout = 200
@@ -232,6 +234,29 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testCommitOffsetFromUnknownGroup() {
+    val groupId = "groupId"
+    val consumerId = "consumer"
+    val generationId = 1
+    val tp = new TopicAndPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp ->
offset), true)
+    assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testCommitOffsetWithDefaultGeneration() {
+    val groupId = "groupId"
+    val tp = new TopicAndPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_CONSUMER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+  }
+
+  @Test
   def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
     val groupId = "groupId"
     val partitionAssignmentStrategy = "range"
@@ -291,6 +316,13 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     (responseFuture, responseCallback)
   }
 
+  private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback)
= {
+    val responsePromise = Promise[CommitOffsetCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
+    (responseFuture, responseCallback)
+  }
+
   private def sendJoinGroup(groupId: String,
                             consumerId: String,
                             partitionAssignmentStrategy: String,
@@ -325,4 +357,22 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
+
+  private def commitOffsets(groupId: String,
+                            consumerId: String,
+                            generationId: Int,
+                            offsets: Map[TopicAndPartition, OffsetAndMetadata],
+                            isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams =
{
+    val (responseFuture, responseCallback) = setupCommitOffsetsCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    val storeOffsetAnswer = new IAnswer[Unit] {
+      override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code))
+    }
+    EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets,
responseCallback))
+      .andAnswer(storeOffsetAnswer)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
 }


Mime
View raw message