kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2389: remove commit type from new consumer.
Date Fri, 11 Sep 2015 22:47:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fd1239658 -> 845514d62


KAFKA-2389: remove commit type from new consumer.

A shot to remove commit type from new consumer. The coordinator constructor takes a default offset commit callback mainly for testing purpose.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Guohang Wang

Closes #134 from becketqin/KAFKA-2389


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/845514d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/845514d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/845514d6

Branch: refs/heads/trunk
Commit: 845514d62329be8382e6d02b8041fc858718d534
Parents: fd12396
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Sep 11 15:49:51 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 11 15:49:51 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |  21 ++--
 .../consumer/ConsumerCommitCallback.java        |  33 ------
 .../kafka/clients/consumer/KafkaConsumer.java   | 104 ++++++++++---------
 .../kafka/clients/consumer/MockConsumer.java    |  19 ++--
 .../clients/consumer/OffsetCommitCallback.java  |  33 ++++++
 .../clients/consumer/internals/Coordinator.java |  67 +++++-------
 .../clients/consumer/MockConsumerTest.java      |   2 +-
 .../consumer/internals/CoordinatorTest.java     |  77 +++++++-------
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  21 ++--
 .../copycat/runtime/WorkerSinkTaskTest.java     |   7 +-
 .../kafka/api/ConsumerBounceTest.scala          |   4 +-
 .../integration/kafka/api/ConsumerTest.scala    |  14 +--
 .../integration/kafka/api/SSLConsumerTest.scala |   4 +-
 13 files changed, 212 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 54e8869..b21ec89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -72,24 +72,29 @@ public interface Consumer<K, V> extends Closeable {
     public ConsumerRecords<K, V> poll(long timeout);
 
     /**
-     * @see KafkaConsumer#commit(CommitType)
+     * @see KafkaConsumer#commitSync()
      */
-    public void commit(CommitType commitType);
+    public void commitSync();
 
     /**
-     * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
+     * @see KafkaConsumer#commitSync(Map)
      */
-    public void commit(CommitType commitType, ConsumerCommitCallback callback);
+    public void commitSync(Map<TopicPartition, Long> offsets);
 
     /**
-     * @see KafkaConsumer#commit(Map, CommitType)
+     * @see KafkaConsumer#commitAsync()
      */
-    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);
+    public void commitAsync();
 
     /**
-     * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
+     * @see KafkaConsumer#commitAsync(OffsetCommitCallback)
      */
-    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);
+    public void commitAsync(OffsetCommitCallback callback);
+
+    /**
+     * @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback)
+     */
+    public void commitAsync(Map<TopicPartition, Long> offsets, OffsetCommitCallback callback);
 
     /**
      * @see KafkaConsumer#seek(TopicPartition, long)

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
deleted file mode 100644
index f084385..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Map;
-
-/**
- * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
- * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
- */
-public interface ConsumerCommitCallback {
-
-    /**
-     * A callback method the user can implement to provide asynchronous handling of commit request completion.
-     * This method will be called when the commit request sent to the server has been acknowledged.
-     *
-     * @param offsets A map of the offsets that this callback applies to
-     * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
-     */
-    void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5763bac..3ac2be8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -81,10 +81,10 @@ import java.util.regex.Pattern;
  * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
  * every time the consumer receives data calls {@link #poll(long)} and receives messages.
  * <p>
- * The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the
+ * The {@link #commitSync() committed position} is the last offset that has been saved securely. Should the
  * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
  * offsets periodically, or it can choose to control this committed position manually by calling
- * {@link #commit(CommitType) commit}.
+ * {@link #commitSync() commit}.
  * <p>
  * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
  * detail below.
@@ -198,7 +198,7 @@ import java.util.regex.Pattern;
  *             buffer.add(record);
  *             if (buffer.size() &gt;= commitInterval) {
  *                 insertIntoDb(buffer);
- *                 consumer.commit(CommitType.SYNC);
+ *                 consumer.commitSync();
  *                 buffer.clear();
  *             }
  *         }
@@ -542,7 +542,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
                     metricsTags,
                     this.time,
                     requestTimeoutMs,
-                    retryBackoffMs);
+                    retryBackoffMs,
+                    new Coordinator.DefaultOffsetCommitCallback());
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -752,8 +753,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
      * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
      * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
      * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
-     * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
-     *
+     * offset using {@link #commitSync(Map) commit(offsets)} for the subscribed list of partitions.
+     * 
      * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
      *            immediately with any records available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
@@ -821,7 +822,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
     private void scheduleAutoCommitTask(final long interval) {
         DelayedTask task = new DelayedTask() {
             public void run(long now) {
-                commit(CommitType.ASYNC);
+                commitAsync(new OffsetCommitCallback() {
+                    @Override
+                    public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+                        if (exception != null)
+                            log.error("Auto offset commit failed.", exception);
+                    }
+                });
                 client.schedule(this, now + interval);
             }
         };
@@ -829,24 +836,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
     }
 
     /**
-     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
      * <p>
-     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
-     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
-     * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
-     * {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
-     * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
-     * to the caller).
-     *
-     * @param offsets The list of offsets per partition that should be committed to Kafka.
-     * @param commitType Control whether the commit is blocking
+     * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
+     * encountered (in which case it is thrown to the caller).
      */
     @Override
-    public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
-        commit(offsets, commitType, null);
+    public void commitSync() {
+        acquire();
+        try {
+            commitSync(subscriptions.allConsumed());
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -856,69 +862,73 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>, Metadata.Listener {
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
-     * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
-     * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
-     * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+     * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
+     * encountered (in which case it is thrown to the caller).
      *
      * @param offsets The list of offsets per partition that should be committed to Kafka.
-     * @param commitType Control whether the commit is blocking
-     * @param callback Callback to invoke when the commit completes
      */
     @Override
-    public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+    public void commitSync(final Map<TopicPartition, Long> offsets) {
         acquire();
         try {
-            log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
-            coordinator.commitOffsets(offsets, commitType, callback);
+            coordinator.commitOffsetsSync(offsets);
         } finally {
             release();
         }
     }
 
     /**
+     * Convenient method. Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
+     */
+    @Override
+    public void commitAsync() {
+        commitAsync(null);
+    }
+
+    /**
      * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
      * <p>
      * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
-     * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
-     * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
-     * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+     * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
+     * (if provided) or discarded.
      *
-     * @param commitType Whether or not the commit should block until it is acknowledged.
      * @param callback Callback to invoke when the commit completes
      */
     @Override
-    public void commit(CommitType commitType, ConsumerCommitCallback callback) {
+    public void commitAsync(OffsetCommitCallback callback) {
         acquire();
         try {
-            commit(subscriptions.allConsumed(), commitType, callback);
+            commitAsync(subscriptions.allConsumed(), callback);
         } finally {
             release();
         }
     }
 
     /**
-     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
      * <p>
-     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
-     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
-     * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
-     * {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
-     * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
-     * to the caller).
+     * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
+     * (if provided) or discarded.
      *
-     * @param commitType Whether or not the commit should block until it is acknowledged.
+     * @param offsets The list of offsets per partition that should be committed to Kafka.
+     * @param callback Callback to invoke when the commit completes
      */
     @Override
-    public void commit(CommitType commitType) {
-        commit(commitType, null);
+    public void commitAsync(final Map<TopicPartition, Long> offsets, OffsetCommitCallback callback) {
+        acquire();
+        try {
+            log.debug("Committing offsets: {} ", offsets);
+            coordinator.commitOffsetsAsync(offsets, callback);
+        } finally {
+            release();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 7e038f2..78c9c15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -131,7 +131,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+    public synchronized void commitAsync(Map<TopicPartition, Long> offsets, OffsetCommitCallback callback) {
         ensureNotClosed();
         for (Entry<TopicPartition, Long> entry : offsets.entrySet())
             subscriptions.committed(entry.getKey(), entry.getValue());
@@ -141,19 +141,24 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
-        commit(offsets, commitType, null);
+    public synchronized void commitSync(Map<TopicPartition, Long> offsets) {
+        commitAsync(offsets, null);
     }
 
     @Override
-    public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) {
+    public synchronized void commitAsync() {
+        commitAsync(null);
+    }
+
+    @Override
+    public synchronized void commitAsync(OffsetCommitCallback callback) {
         ensureNotClosed();
-        commit(this.subscriptions.allConsumed(), commitType, callback);
+        commitAsync(this.subscriptions.allConsumed(), callback);
     }
 
     @Override
-    public synchronized void commit(CommitType commitType) {
-        commit(commitType, null);
+    public synchronized void commitSync() {
+        commitSync(this.subscriptions.allConsumed());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
new file mode 100644
index 0000000..bed82f6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
+ * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
+ */
+public interface OffsetCommitCallback {
+
+    /**
+     * A callback method the user can implement to provide asynchronous handling of commit request completion.
+     * This method will be called when the commit request sent to the server has been acknowledged.
+     *
+     * @param offsets A map of the offsets that this callback applies to
+     * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
+     */
+    void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index b804796..e7ffe25 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -13,8 +13,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.consumer.CommitType;
-import org.apache.kafka.clients.consumer.ConsumerCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -71,11 +70,11 @@ public final class Coordinator {
     private final CoordinatorMetrics sensors;
     private final long requestTimeoutMs;
     private final long retryBackoffMs;
+    private final OffsetCommitCallback defaultOffsetCommitCallback;
     private Node consumerCoordinator;
     private String consumerId;
     private int generation;
 
-
     /**
      * Initialize the coordination manager.
      */
@@ -90,8 +89,8 @@ public final class Coordinator {
                        Map<String, String> metricTags,
                        Time time,
                        long requestTimeoutMs,
-                       long retryBackoffMs) {
-
+                       long retryBackoffMs,
+                       OffsetCommitCallback defaultOffsetCommitCallback) {
         this.client = client;
         this.time = time;
         this.generation = -1;
@@ -106,6 +105,7 @@ public final class Coordinator {
         this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
         this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
+        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
     }
 
     /**
@@ -215,21 +215,6 @@ public final class Coordinator {
         }
     }
 
-    /**
-     * Commit offsets. This call blocks (regardless of commitType) until the coordinator
-     * can receive the commit request. Once the request has been made, however, only the
-     * synchronous commits will wait for a successful response from the coordinator.
-     * @param offsets Offsets to commit.
-     * @param commitType Commit policy
-     * @param callback Callback to be executed when the commit request finishes
-     */
-    public void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
-        if (commitType == CommitType.ASYNC)
-            commitOffsetsAsync(offsets, callback);
-        else
-            commitOffsetsSync(offsets, callback);
-    }
-
     private class HeartbeatTask implements DelayedTask {
 
         public void reset() {
@@ -363,25 +348,24 @@ public final class Coordinator {
         }
     }
 
-    private void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, final ConsumerCommitCallback callback) {
+    public void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, OffsetCommitCallback callback) {
         this.subscriptions.needRefreshCommits();
         RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
-        if (callback != null) {
-            future.addListener(new RequestFutureListener<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    callback.onComplete(offsets, null);
-                }
+        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
+        future.addListener(new RequestFutureListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                cb.onComplete(offsets, null);
+            }
 
-                @Override
-                public void onFailure(RuntimeException e) {
-                    callback.onComplete(offsets, e);
-                }
-            });
-        }
+            @Override
+            public void onFailure(RuntimeException e) {
+                cb.onComplete(offsets, e);
+            }
+        });
     }
 
-    private void commitOffsetsSync(Map<TopicPartition, Long> offsets, ConsumerCommitCallback callback) {
+    public void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
         while (true) {
             ensureCoordinatorKnown();
             ensurePartitionAssignment();
@@ -390,17 +374,11 @@ public final class Coordinator {
             client.poll(future);
 
             if (future.succeeded()) {
-                if (callback != null)
-                    callback.onComplete(offsets, null);
                 return;
             }
 
             if (!future.isRetriable()) {
-                if (callback == null)
-                    throw future.exception();
-                else
-                    callback.onComplete(offsets, future.exception());
-                return;
+                throw future.exception();
             }
 
             Utils.sleep(retryBackoffMs);
@@ -437,6 +415,13 @@ public final class Coordinator {
                 .compose(new OffsetCommitResponseHandler(offsets));
     }
 
+    public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
+        @Override
+        public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+            if (exception != null)
+                log.error("Offset commit failed.", exception);
+        }
+    }
 
     private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 6e6a118..068322f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -58,7 +58,7 @@ public class MockConsumerTest {
         assertEquals(rec2, iter.next());
         assertFalse(iter.hasNext());
         assertEquals(1L, consumer.position(new TopicPartition("test", 0)));
-        consumer.commit(CommitType.SYNC);
+        consumer.commitSync();
         assertEquals(1L, consumer.committed(new TopicPartition("test", 0)));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 3452639..9eb2a27 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -23,8 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.CommitType;
-import org.apache.kafka.clients.consumer.ConsumerCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -74,6 +73,7 @@ public class CoordinatorTest {
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private ConsumerNetworkClient consumerClient;
     private MockSubscriptionListener subscriptionListener;
+    private MockCommitCallback defaultOffsetCommitCallback;
     private Coordinator coordinator;
 
     @Before
@@ -85,6 +85,7 @@ public class CoordinatorTest {
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
         this.metrics = new Metrics(time);
         this.subscriptionListener = new MockSubscriptionListener();
+        this.defaultOffsetCommitCallback = new MockCommitCallback();
 
         client.setNode(node);
 
@@ -99,7 +100,8 @@ public class CoordinatorTest {
                 metricTags,
                 time,
                 requestTimeoutMs,
-                retryBackoffMs);
+                retryBackoffMs,
+                defaultOffsetCommitCallback);
     }
 
     @Test
@@ -306,12 +308,36 @@ public class CoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
         AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, callback(success));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), callback(success));
         consumerClient.poll(0);
         assertTrue(success.get());
     }
 
     @Test
+    public void testCommitOffsetAsyncWithDefaultCallback() {
+        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), null);
+        consumerClient.poll(0);
+        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+        assertNull(defaultOffsetCommitCallback.exception);
+    }
+
+    @Test
+    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
+        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), null);
+        consumerClient.poll(0);
+        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+        assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
+    }
+
+    @Test
     public void testCommitOffsetAsyncCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
@@ -319,7 +345,7 @@ public class CoordinatorTest {
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), cb);
         consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -335,7 +361,7 @@ public class CoordinatorTest {
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), cb);
         consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -351,7 +377,7 @@ public class CoordinatorTest {
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.ASYNC, cb);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), cb);
         consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -365,13 +391,10 @@ public class CoordinatorTest {
         coordinator.ensureCoordinatorKnown();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
-        assertEquals(1, cb.invoked);
-        assertNull(cb.exception);
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
     }
 
     @Test
@@ -380,13 +403,10 @@ public class CoordinatorTest {
         coordinator.ensureCoordinatorKnown();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
-        assertEquals(1, cb.invoked);
-        assertNull(cb.exception);
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
     }
 
     @Test
@@ -395,35 +415,20 @@ public class CoordinatorTest {
         coordinator.ensureCoordinatorKnown();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
-        assertEquals(1, cb.invoked);
-        assertNull(cb.exception);
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
     }
 
     @Test(expected = ApiException.class)
-    public void testCommitOffsetSyncThrowsNonRetriableException() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // sync commit with invalid partitions should throw if we have no callback
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, null);
-    }
-
-    @Test
-    public void testCommitOffsetSyncCallbackHandlesNonRetriableException() {
+    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
         // sync commit with invalid partitions should throw if we have no callback
-        MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), CommitType.SYNC, cb);
-        assertTrue(cb.exception instanceof ApiException);
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
     }
 
     @Test
@@ -507,8 +512,8 @@ public class CoordinatorTest {
         return response.toStruct();
     }
 
-    private ConsumerCommitCallback callback(final AtomicBoolean success) {
-        return new ConsumerCommitCallback() {
+    private OffsetCommitCallback callback(final AtomicBoolean success) {
+        return new OffsetCommitCallback() {
             @Override
             public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
                 if (exception == null)
@@ -517,7 +522,7 @@ public class CoordinatorTest {
         };
     }
 
-    private static class MockCommitCallback implements ConsumerCommitCallback {
+    private static class MockCommitCallback implements OffsetCommitCallback {
         public int invoked = 0;
         public Exception exception = null;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 7e71fb8..1047213 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.runtime;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.consumer.*;
@@ -137,13 +138,21 @@ class WorkerSinkTask implements WorkerTask {
             }
         }
 
-        ConsumerCommitCallback cb = new ConsumerCommitCallback() {
-            @Override
-            public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
-                workThread.onCommitCompleted(error, seqno);
+        if (sync) {
+            try {
+                consumer.commitSync(offsets);
+            } catch (KafkaException e) {
+                workThread.onCommitCompleted(e, seqno);
             }
-        };
-        consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
+        } else {
+            OffsetCommitCallback cb = new OffsetCommitCallback() {
+                @Override
+                public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
+                    workThread.onCommitCompleted(error, seqno);
+                }
+            };
+            consumer.commitAsync(offsets, cb);
+        }
     }
 
     public Time time() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 542ed76..91469d3 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -328,7 +328,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         return capturedRecords;
     }
 
-    private Capture<ConsumerCommitCallback> expectOffsetFlush(final long expectedMessages,
+    private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
                                                               final RuntimeException flushError,
                                                               final Exception consumerCommitError,
                                                               final long consumerCommitDelayMs,
@@ -353,10 +353,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
             return null;
         }
 
-        final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture();
+        final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
         final Map<TopicPartition, Long> offsets = Collections.singletonMap(TOPIC_PARTITION, finalOffset);
-        consumer.commit(EasyMock.eq(offsets),
-                EasyMock.eq(CommitType.ASYNC),
+        consumer.commitAsync(EasyMock.eq(offsets),
                 EasyMock.capture(capturedCallback));
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 666d62f..28a3e90 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -86,7 +86,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
         consumed += 1
       }
 
-      consumer.commit(CommitType.SYNC)
+      consumer.commitSync()
       assertEquals(consumer.position(tp), consumer.committed(tp))
 
       if (consumer.position(tp) == numRecords) {
@@ -130,7 +130,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
         assertEquals(pos, consumer.position(tp))
       } else if (coin == 2) {
         info("Committing offset.")
-        consumer.commit(CommitType.SYNC)
+        consumer.commitSync()
         assertEquals(consumer.position(tp), consumer.committed(tp))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index b393692..962d2b8 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -81,7 +81,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
 
     // check async commit callbacks
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commit(CommitType.ASYNC, commitCallback)
+    this.consumers(0).commitAsync(commitCallback)
 
     // shouldn't make progress until poll is invoked
     Thread.sleep(10)
@@ -186,7 +186,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     this.consumers(0).poll(50)
     val pos1 = this.consumers(0).position(tp)
     val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC)
+    this.consumers(0).commitSync(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava)
     assertEquals(3, this.consumers(0).committed(tp))
     intercept[NoOffsetForPartitionException] {
       this.consumers(0).committed(tp2)
@@ -194,13 +194,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC)
+    this.consumers(0).commitSync(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava)
     assertEquals(3, this.consumers(0).committed(tp))
     assertEquals(5, this.consumers(0).committed(tp2))
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback)
+    this.consumers(0).commitAsync(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, commitCallback)
     awaitCommitCallback(this.consumers(0), commitCallback)
     assertEquals(7, this.consumers(0).committed(tp2))
   }
@@ -257,12 +257,12 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     this.consumers(0).assign(List(tp))
 
     assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
-    this.consumers(0).commit(CommitType.SYNC)
+    this.consumers(0).commitSync()
     assertEquals(0L, this.consumers(0).committed(tp))
 
     consumeRecords(this.consumers(0), 5, 0)
     assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
-    this.consumers(0).commit(CommitType.SYNC)
+    this.consumers(0).commitSync()
     assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
 
     sendRecords(1)
@@ -477,7 +477,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     assertEquals(startCount + 1, commitCallback.count)
   }
 
-  private class CountConsumerCommitCallback extends ConsumerCommitCallback {
+  private class CountConsumerCommitCallback extends OffsetCommitCallback {
     var count = 0
 
     override def onComplete(offsets: util.Map[TopicPartition, lang.Long], exception: Exception): Unit = count += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/845514d6/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
index 057c70c..df9b3d8 100644
--- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -182,12 +182,12 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
     this.consumers(0).assign(List(tp))
 
     assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
-    this.consumers(0).commit(CommitType.SYNC)
+    this.consumers(0).commitSync()
     assertEquals(0L, this.consumers(0).committed(tp))
 
     consumeRecords(this.consumers(0), 5, 0)
     assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
-    this.consumers(0).commit(CommitType.SYNC)
+    this.consumers(0).commitSync()
     assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
 
     sendRecords(1)


Mime
View raw message