kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6829: retry commits on unknown topic or partition (#4948)
Date Thu, 03 May 2018 00:01:37 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04a70bd  KAFKA-6829: retry commits on unknown topic or partition (#4948)
04a70bd is described below

commit 04a70bd3fe40628462f63955c8522cae625feee3
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed May 2 20:01:28 2018 -0400

    KAFKA-6829: retry commits on unknown topic or partition (#4948)
    
    For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's behavior to retry
after this error. While this is a rare case since the user would not commit offsets for topics
unless they had been able to fetch from them, but this doesn't really handle the situation
where the broker hasn't received any metadata updates.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, John Roesler <john@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/consumer/internals/ConsumerCoordinator.java  |  6 ++----
 .../clients/consumer/internals/ConsumerCoordinatorTest.java    | 10 ++++++----
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c96..eec070e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,7 +757,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         // raise the error to the user
                         future.raise(error);
                         return;
-                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
+                            || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         // just retry
                         future.raise(error);
                         return;
@@ -774,9 +775,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         resetGeneration();
                         future.raise(new CommitFailedException());
                         return;
-                    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        future.raise(new KafkaException("Topic or Partition " + tp + " does
not exist"));
-                        return;
                     } else {
                         future.raise(new KafkaException("Unexpected error in commit: " +
error.message()));
                         return;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3e3c423..0304190 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1373,13 +1373,15 @@ public class ConsumerCoordinatorTest {
         assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
     }
 
-    @Test(expected = KafkaException.class)
-    public void testCommitUnknownTopicOrPartition() {
+    @Test
+    public void testRetryCommitUnknownTopicOrPartition() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_TOPIC_OR_PARTITION);
-        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")),
Long.MAX_VALUE);
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+
+        assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L,
"metadata")), 10000));
     }
 
     @Test(expected = OffsetMetadataTooLarge.class)

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message