kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2555: Fix infinite recursive ensurePartitionAssignment in callback's commitSync
Date Fri, 25 Sep 2015 18:58:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7e453df31 -> a07fbd0f1


KAFKA-2555: Fix infinite recursive ensurePartitionAssignment in callback's commitSync

hachikuji ewencp I found this problem when adding new consumer to mirror maker which commits
offset in the rebalance callback. It is not clear to me why we are triggering rebalance for
commitSync() and fetchCommittedOffset(). Can you help review to see if I miss something?

Regarding commitSync, After each poll() the partitions will be either assigned to a consumer
or it will be already revoked. As long as user is using internal offset map, the offset map
will always be valid. i.e. the offset map will always only contain the assigned partitions
when commitSync is called. Hence there is no need to trigger a rebalance in commitSync().

The same guarantee also apply to fetchCommittedOffset(), isn't the only requirement is to
ensure we know the coordinator?

Another related issue is that today the IllegalGenerationIdException is a bit confusing. When
we receive an IllegalGenerationIdException from heartbeat, we need to use that same generation
Id to commit offset and the coordinator will take it. So the generation ID was not really
illegal. I will file a ticket for this issue.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #221 from becketqin/KAFKA-2555


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a07fbd0f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a07fbd0f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a07fbd0f

Branch: refs/heads/trunk
Commit: a07fbd0f19962037ec386b1814dc6c95749c91a3
Parents: 7e453df
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Sep 25 12:01:41 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 25 12:01:41 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/internals/Coordinator.java    | 11 ++++++++---
 .../kafka/common/errors/IllegalGenerationException.java  |  2 +-
 .../kafka/common/errors/UnknownConsumerIdException.java  |  2 +-
 3 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a07fbd0f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 81a7c9c..0b31c22 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.UnknownConsumerIdException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -134,7 +136,6 @@ public final class Coordinator {
     public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition>
partitions) {
         while (true) {
             ensureCoordinatorKnown();
-            ensurePartitionAssignment();
 
             // contact coordinator to fetch committed offsets
             RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
@@ -197,7 +198,9 @@ public final class Coordinator {
             client.poll(future);
 
             if (future.failed()) {
-                if (!future.isRetriable())
+                if (future.exception() instanceof UnknownConsumerIdException)
+                    continue;
+                else if (!future.isRetriable())
                     throw future.exception();
                 Utils.sleep(retryBackoffMs);
             }
@@ -368,9 +371,11 @@ public final class Coordinator {
     }
 
     public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (offsets.isEmpty())
+            return;
+
         while (true) {
             ensureCoordinatorKnown();
-            ensurePartitionAssignment();
 
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
             client.poll(future);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a07fbd0f/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
index d20b74a..fe8ba7a 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
@@ -12,7 +12,7 @@
  */
 package org.apache.kafka.common.errors;
 
-public class IllegalGenerationException extends RetriableException {
+public class IllegalGenerationException extends ApiException {
     private static final long serialVersionUID = 1L;
 
     public IllegalGenerationException() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a07fbd0f/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
index 9bcbd11..28bfd72 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
@@ -12,7 +12,7 @@
  */
 package org.apache.kafka.common.errors;
 
-public class UnknownConsumerIdException extends RetriableException {
+public class UnknownConsumerIdException extends ApiException {
     private static final long serialVersionUID = 1L;
 
     public UnknownConsumerIdException() {


Mime
View raw message