kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5556; Fix IllegalStateException in KafkaConsumer.commitSync due to missing future completion check
Date Thu, 20 Jul 2017 16:07:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe4a469fc -> e0099e1f5


KAFKA-5556; Fix IllegalStateException in KafkaConsumer.commitSync due to missing future completion
check

This PR makes `commitOffsetsSync` method check whether future is completed after client's
poll or not.

Author: umesh chaudhary <umesh9794@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3489 from umesh9794/KAFKA-5556


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0099e1f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0099e1f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0099e1f

Branch: refs/heads/trunk
Commit: e0099e1f5852536918bfa27b4d1c33779e45ab6b
Parents: fe4a469
Author: umesh chaudhary <umesh9794@gmail.com>
Authored: Thu Jul 20 08:54:21 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Jul 20 08:54:25 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 2 +-
 .../clients/consumer/internals/ConsumerCoordinatorTest.java   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e0099e1f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b35a571..d39b369 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -610,7 +610,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 return true;
             }
 
-            if (!future.isRetriable())
+            if (future.failed() && !future.isRetriable())
                 throw future.exception();
 
             time.sleep(retryBackoffMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0099e1f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7d22351..fc0ddff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1288,6 +1288,13 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testCommitOffsetSyncWithoutFutureGetsCompleted() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+        assertFalse(coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)),
0));
+    }
+
+    @Test
     public void testRefreshOffset() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();


Mime
View raw message