kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3527: Consumer commitAsync should not expose internal exceptions
Date Thu, 05 May 2016 02:21:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 64451af9e -> ad3165097


KAFKA-3527: Consumer commitAsync should not expose internal exceptions

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Grant Henke <granthenke@gmail.com>, Jason Gustafson <jason@confluent.io>,
Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1300 from Ishiihara/kafka-3527


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ad316509
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ad316509
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ad316509

Branch: refs/heads/trunk
Commit: ad316509787787afeed6e2a24a62fd22cadd09c7
Parents: 64451af
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Wed May 4 19:20:54 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed May 4 19:20:54 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/CommitFailedException.java |  1 -
 .../clients/consumer/OffsetCommitCallback.java  | 13 +++++++-
 .../RetriableCommitFailedException.java         | 32 ++++++++++++++++++++
 .../consumer/internals/ConsumerCoordinator.java |  9 +++++-
 .../internals/ConsumerCoordinatorTest.java      | 12 +++++---
 5 files changed, 59 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ad316509/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index 39468bd..26ef48e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -31,5 +31,4 @@ public class CommitFailedException extends KafkaException {
     public CommitFailedException(String message) {
         super(message);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad316509/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index 97a06ad..dfa8391 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -28,6 +29,16 @@ public interface OffsetCommitCallback {
      *
      * @param offsets A map of the offsets and associated metadata that this callback applies
to
      * @param exception The exception thrown during processing of the request, or null if
the commit completed successfully
+     *
+     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed
and cannot be retried.
+     *             This can only occur if you are using automatic group management with {@link
KafkaConsumer#subscribe(Collection)},
+     *             or if there is an active group with the same groupId which is using group
management.
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link KafkaConsumer#wakeup()}
is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to
the topic or to the
+     *             configured groupId
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
(e.g. if offset metadata
+     *             is too large or if the committed offset is invalid).
      */
     void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad316509/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
new file mode 100644
index 0000000..459a8ac
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.errors.RetriableException;
+
+public class RetriableCommitFailedException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RetriableCommitFailedException(String message) {
+        super(message);
+    }
+
+    public RetriableCommitFailedException(String message, Throwable t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad316509/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7486969..d44d8eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -23,7 +23,9 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscriptio
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.TopicConstants;
@@ -354,6 +356,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+
     public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
         this.subscriptions.needRefreshCommits();
         RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
@@ -368,7 +371,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
             @Override
             public void onFailure(RuntimeException e) {
-                cb.onComplete(offsets, e);
+                if (e instanceof RetriableException) {
+                    cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets
failed with retriable exception. You should retry committing offsets.", e));
+                } else {
+                    cb.onComplete(offsets, e);
+                }
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad316509/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 5a174db..bb31acf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@@ -85,7 +86,7 @@ public class ConsumerCoordinatorTest {
     private boolean autoCommitEnabled = false;
     private long autoCommitIntervalMs = 2000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
-    private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor);
+    private List<PartitionAssignor> assignors = Collections.<PartitionAssignor>singletonList(partitionAssignor);
     private MockTime time;
     private MockClient client;
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
@@ -98,6 +99,7 @@ public class ConsumerCoordinatorTest {
     private MockCommitCallback defaultOffsetCommitCallback;
     private ConsumerCoordinator coordinator;
 
+
     @Before
     public void setup() {
         this.time = new MockTime();
@@ -898,7 +900,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
null);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
-        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
+        assertTrue(defaultOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
     }
 
     @Test
@@ -913,7 +915,7 @@ public class ConsumerCoordinatorTest {
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
-        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
+        assertTrue(cb.exception instanceof RetriableCommitFailedException);
     }
 
     @Test
@@ -928,7 +930,7 @@ public class ConsumerCoordinatorTest {
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
-        assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), cb.exception);
+        assertTrue(cb.exception instanceof RetriableCommitFailedException);
     }
 
     @Test
@@ -943,7 +945,7 @@ public class ConsumerCoordinatorTest {
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
-        assertTrue(cb.exception instanceof DisconnectException);
+        assertTrue(cb.exception instanceof RetriableCommitFailedException);
     }
 
     @Test


Mime
View raw message