kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2403; Add support for commit metadata in KafkaConsumer
Date Wed, 23 Sep 2015 21:06:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2837fa5a3 -> d1dd1e902


KAFKA-2403; Add support for commit metadata in KafkaConsumer

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #119 from hachikuji/KAFKA-2403


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1dd1e90
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1dd1e90
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1dd1e90

Branch: refs/heads/trunk
Commit: d1dd1e902685178a990861f9948fe07f820cf4ef
Parents: 2837fa5
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Sep 23 14:10:06 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 23 14:10:06 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  2 +
 .../apache/kafka/clients/consumer/Consumer.java |  6 +-
 .../kafka/clients/consumer/KafkaConsumer.java   | 20 ++---
 .../kafka/clients/consumer/MockConsumer.java    |  8 +-
 .../clients/consumer/OffsetAndMetadata.java     | 80 ++++++++++++++++++++
 .../clients/consumer/OffsetCommitCallback.java  |  4 +-
 .../clients/consumer/internals/Coordinator.java | 78 ++++++++++---------
 .../clients/consumer/internals/Fetcher.java     |  6 +-
 .../consumer/internals/RequestFuture.java       |  4 +
 .../consumer/internals/SubscriptionState.java   | 17 +++--
 .../common/requests/OffsetCommitRequest.java    | 12 +--
 .../clients/consumer/MockConsumerTest.java      |  2 +-
 .../consumer/internals/CoordinatorTest.java     | 66 ++++++++++++----
 .../clients/consumer/internals/FetcherTest.java |  3 +-
 .../internals/SubscriptionStateTest.java        | 18 ++++-
 .../org/apache/kafka/copycat/sink/SinkTask.java |  3 +-
 .../kafka/copycat/file/FileStreamSinkTask.java  |  3 +-
 .../copycat/file/FileStreamSinkTaskTest.java    |  9 ++-
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  6 +-
 .../copycat/runtime/WorkerSinkTaskTest.java     |  4 +-
 .../kafka/api/ConsumerBounceTest.scala          |  4 +-
 .../integration/kafka/api/ConsumerTest.scala    | 40 +++++++---
 .../integration/kafka/api/SSLConsumerTest.scala |  4 +-
 23 files changed, 276 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index eb682f4..f24977b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -131,6 +131,7 @@
 		</subpackage>
 
 		<subpackage name="sink">
+		        <allow pkg="org.apache.kafka.clients.consumer" />
 			<allow pkg="org.apache.kafka.copycat.connector" />
 			<allow pkg="org.apache.kafka.copycat.storage" />
 		</subpackage>
@@ -170,6 +171,7 @@
 
 		<subpackage name="file">
 			<allow pkg="org.apache.kafka.copycat" />
+		        <allow pkg="org.apache.kafka.clients.consumer" />
 			<!-- for tests -->
 			<allow pkg="org.easymock" />
 			<allow pkg="org.powermock" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b21ec89..a3d8776 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -79,7 +79,7 @@ public interface Consumer<K, V> extends Closeable {
     /**
      * @see KafkaConsumer#commitSync(Map)
      */
-    public void commitSync(Map<TopicPartition, Long> offsets);
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
 
     /**
      * @see KafkaConsumer#commitAsync()
@@ -94,7 +94,7 @@ public interface Consumer<K, V> extends Closeable {
     /**
      * @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback)
      */
-    public void commitAsync(Map<TopicPartition, Long> offsets, OffsetCommitCallback callback);
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
 
     /**
      * @see KafkaConsumer#seek(TopicPartition, long)
@@ -119,7 +119,7 @@ public interface Consumer<K, V> extends Closeable {
     /**
      * @see KafkaConsumer#committed(TopicPartition)
      */
-    public long committed(TopicPartition partition);
+    public OffsetAndMetadata committed(TopicPartition partition);
 
     /**
      * @see KafkaConsumer#metrics()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8831b0b..0b67915 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -48,6 +48,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -837,7 +838,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             public void run(long now) {
                 commitAsync(new OffsetCommitCallback() {
                     @Override
-                    public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                         if (exception != null)
                             log.error("Auto offset commit failed.", exception);
                     }
@@ -878,10 +879,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
      * encountered (in which case it is thrown to the caller).
      *
-     * @param offsets The list of offsets per partition that should be committed to Kafka.
+     * @param offsets A map of offsets by partition with associated metadata
      */
     @Override
-    public void commitSync(final Map<TopicPartition, Long> offsets) {
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
         acquire();
         try {
             coordinator.commitOffsetsSync(offsets);
@@ -930,15 +931,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
      * (if provided) or discarded.
      *
-     * @param offsets The list of offsets per partition that should be committed to Kafka.
+     * @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it
+     *                is safe to mutate the map after returning.
      * @param callback Callback to invoke when the commit completes
      */
     @Override
-    public void commitAsync(final Map<TopicPartition, Long> offsets, OffsetCommitCallback callback) {
+    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
         acquire();
         try {
             log.debug("Committing offsets: {} ", offsets);
-            coordinator.commitOffsetsAsync(offsets, callback);
+            coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
         } finally {
             release();
         }
@@ -1033,10 +1035,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             partition.
      */
     @Override
-    public long committed(TopicPartition partition) {
+    public OffsetAndMetadata committed(TopicPartition partition) {
         acquire();
         try {
-            Long committed;
+            OffsetAndMetadata committed;
             if (subscriptions.isAssigned(partition)) {
                 committed = this.subscriptions.committed(partition);
                 if (committed == null) {
@@ -1044,7 +1046,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     committed = this.subscriptions.committed(partition);
                 }
             } else {
-                Map<TopicPartition, Long> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+                Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
                 committed = offsets.get(partition);
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 0ba5797..e7e3618 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -131,9 +131,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void commitAsync(Map<TopicPartition, Long> offsets, OffsetCommitCallback callback) {
+    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
         ensureNotClosed();
-        for (Entry<TopicPartition, Long> entry : offsets.entrySet())
+        for (Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet())
             subscriptions.committed(entry.getKey(), entry.getValue());
         if (callback != null) {
             callback.onComplete(offsets, null);
@@ -141,7 +141,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void commitSync(Map<TopicPartition, Long> offsets) {
+    public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         commitAsync(offsets, null);
     }
 
@@ -168,7 +168,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized long committed(TopicPartition partition) {
+    public synchronized OffsetAndMetadata committed(TopicPartition partition) {
         ensureNotClosed();
         return subscriptions.committed(partition);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
new file mode 100644
index 0000000..1a93047
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+/**
+ * The Kafka offset commit API allows users to provide additional metadata (in the form of a string)
+ * when an offset is committed. This can be useful (for example) to store information about which
+ * node made the commit, what time the commit was made, etc.
+ */
+public class OffsetAndMetadata {
+    private final long offset;
+    private final String metadata;
+
+    /**
+     * Construct a new OffsetAndMetadata object for committing through {@link KafkaConsumer}.
+     * @param offset The offset to be committed
+     * @param metadata Non-null metadata
+     */
+    public OffsetAndMetadata(long offset, String metadata) {
+        if (metadata == null)
+            throw new IllegalArgumentException("Metadata cannot be null");
+
+        this.offset = offset;
+        this.metadata = metadata;
+    }
+
+    /**
+     * Construct a new OffsetAndMetadata object for committing through {@link KafkaConsumer}. The metadata
+     * associated with the commit will be empty.
+     * @param offset The offset to be committed
+     */
+    public OffsetAndMetadata(long offset) {
+        this(offset, "");
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public String metadata() {
+        return metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        OffsetAndMetadata that = (OffsetAndMetadata) o;
+
+        if (offset != that.offset) return false;
+        return metadata == null ? that.metadata == null : metadata.equals(that.metadata);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (offset ^ (offset >>> 32));
+        result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetAndMetadata{" +
+                "offset=" + offset +
+                ", metadata='" + metadata + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index bed82f6..97a06ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -26,8 +26,8 @@ public interface OffsetCommitCallback {
      * A callback method the user can implement to provide asynchronous handling of commit request completion.
      * This method will be called when the commit request sent to the server has been acknowledged.
      *
-     * @param offsets A map of the offsets that this callback applies to
+     * @param offsets A map of the offsets and associated metadata that this callback applies to
      * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
      */
-    void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
+    void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 5efe300..81a7c9c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
@@ -114,8 +115,8 @@ public final class Coordinator {
      */
     public void refreshCommittedOffsetsIfNeeded() {
         if (subscriptions.refreshCommitsNeeded()) {
-            Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
-            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+            Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                 TopicPartition tp = entry.getKey();
                 // verify assignment is still active
                 if (subscriptions.isAssigned(tp))
@@ -130,13 +131,13 @@ public final class Coordinator {
      * @param partitions The partitions to fetch offsets for
      * @return A map from partition to the committed offset
      */
-    public Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
         while (true) {
             ensureCoordinatorKnown();
             ensurePartitionAssignment();
 
             // contact coordinator to fetch committed offsets
-            RequestFuture<Map<TopicPartition, Long>> future = sendOffsetFetchRequest(partitions);
+            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
             client.poll(future);
 
             if (future.succeeded())
@@ -349,7 +350,7 @@ public final class Coordinator {
         }
     }
 
-    public void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, OffsetCommitCallback callback) {
+    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
         this.subscriptions.needRefreshCommits();
         RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
         final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
@@ -366,7 +367,7 @@ public final class Coordinator {
         });
     }
 
-    public void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
+    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         while (true) {
             ensureCoordinatorKnown();
             ensurePartitionAssignment();
@@ -394,7 +395,7 @@ public final class Coordinator {
      * @param offsets The list of offsets per partition that should be committed.
      * @return A request future whose value indicates whether the commit was successful or not
      */
-    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Long> offsets) {
+    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
@@ -402,10 +403,13 @@ public final class Coordinator {
             return RequestFuture.voidSuccess();
 
         // create the offset commit request
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
-        offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
-        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+            OffsetAndMetadata offsetAndMetadata = entry.getValue();
+            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
+                    offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
+        }
+
         OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
                 this.generation,
                 this.consumerId,
@@ -418,7 +422,7 @@ public final class Coordinator {
 
     public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
         @Override
-        public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
             if (exception != null)
                 log.error("Offset commit failed.", exception);
         }
@@ -426,9 +430,9 @@ public final class Coordinator {
 
     private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
 
-        private final Map<TopicPartition, Long> offsets;
+        private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
-        public OffsetCommitResponseHandler(Map<TopicPartition, Long> offsets) {
+        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
             this.offsets = offsets;
         }
 
@@ -442,38 +446,32 @@ public final class Coordinator {
             sensors.commitLatency.record(response.requestLatencyMs());
             for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
-                long offset = this.offsets.get(tp);
+                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
+                long offset = offsetAndMetadata.offset();
+
                 short errorCode = entry.getValue();
                 if (errorCode == Errors.NONE.code()) {
                     log.debug("Committed offset {} for partition {}", offset, tp);
                     if (subscriptions.isAssigned(tp))
                         // update the local cache only if the partition is still assigned
-                        subscriptions.committed(tp, offset);
-                } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                        || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                    coordinatorDead();
-                    future.raise(Errors.forCode(errorCode));
-                    return;
-                } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
-                        || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
-                    // do not need to throw the exception but just log the error
+                        subscriptions.committed(tp, offsetAndMetadata);
+                } else {
+                    if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+                            || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                        coordinatorDead();
+                    } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+                            || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                        // need to re-join group
+                        subscriptions.needReassignment();
+                    }
+
                     log.error("Error committing partition {} at offset {}: {}",
                             tp,
                             offset,
                             Errors.forCode(errorCode).exception().getMessage());
-                } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
-                        || errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                    // need to re-join group
-                    subscriptions.needReassignment();
+
                     future.raise(Errors.forCode(errorCode));
                     return;
-                } else {
-                    // do not need to throw the exception but just log the error
-                    future.raise(Errors.forCode(errorCode));
-                    log.error("Error committing partition {} at offset {}: {}",
-                            tp,
-                            offset,
-                            Errors.forCode(errorCode).exception().getMessage());
                 }
             }
 
@@ -488,7 +486,7 @@ public final class Coordinator {
      * @param partitions The set of partitions to get offsets for.
      * @return A request future containing the committed offsets.
      */
-    private RequestFuture<Map<TopicPartition, Long>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
+    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
@@ -501,7 +499,7 @@ public final class Coordinator {
                 .compose(new OffsetFetchResponseHandler());
     }
 
-    private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, Long>> {
+    private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
 
         @Override
         public OffsetFetchResponse parse(ClientResponse response) {
@@ -509,8 +507,8 @@ public final class Coordinator {
         }
 
         @Override
-        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, Long>> future) {
-            Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
+        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
             for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
                 OffsetFetchResponse.PartitionData data = entry.getValue();
@@ -537,7 +535,7 @@ public final class Coordinator {
                     return;
                 } else if (data.offset >= 0) {
                     // record the position with the offset (-1 indicates no committed offset to fetch)
-                    offsets.put(tp, data.offset);
+                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                 } else {
                     log.debug("No committed offset for partition " + tp);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0efd34d..487fb0d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -152,9 +152,9 @@ public class Fetcher<K, V> {
                 subscriptions.needOffsetReset(tp);
                 resetOffset(tp);
             } else {
-                log.debug("Resetting offset for partition {} to the committed offset {}",
-                        tp, subscriptions.committed(tp));
-                subscriptions.seek(tp, subscriptions.committed(tp));
+                long committed = subscriptions.committed(tp).offset();
+                log.debug("Resetting offset for partition {} to the committed offset {}", tp, committed);
+                subscriptions.seek(tp, committed);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 5f00251..f5c1afc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -100,6 +100,8 @@ public class RequestFuture<T> {
      * @param value corresponding value (or null if there is none)
      */
     public void complete(T value) {
+        if (isDone)
+            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
         this.value = value;
         this.isDone = true;
         fireSuccess();
@@ -111,6 +113,8 @@ public class RequestFuture<T> {
      * @param e corresponding exception to be passed to caller
      */
     public void raise(RuntimeException e) {
+        if (isDone)
+            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
         this.exception = e;
         this.isDone = true;
         fireFailure();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index f976df4..1f4deea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
@@ -41,7 +42,7 @@ import java.util.regex.Pattern;
  * assignment is changed whether directly by the user or through a group rebalance.
  *
  * This class also maintains a cache of the latest commit position for each of the assigned
- * partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used
+ * partitions. This is updated through {@link #committed(TopicPartition, OffsetAndMetadata)} and can be used
  * to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}.
  */
 public class SubscriptionState {
@@ -176,11 +177,11 @@ public class SubscriptionState {
         return state;
     }
 
-    public void committed(TopicPartition tp, long offset) {
+    public void committed(TopicPartition tp, OffsetAndMetadata offset) {
         assignedState(tp).committed(offset);
     }
 
-    public Long committed(TopicPartition tp) {
+    public OffsetAndMetadata committed(TopicPartition tp) {
         return assignedState(tp).committed;
     }
 
@@ -225,12 +226,12 @@ public class SubscriptionState {
         return assignedState(tp).consumed;
     }
 
-    public Map<TopicPartition, Long> allConsumed() {
-        Map<TopicPartition, Long> allConsumed = new HashMap<>();
+    public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
+        Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
         for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
             TopicPartitionState state = entry.getValue();
             if (state.hasValidPosition)
-                allConsumed.put(entry.getKey(), state.consumed);
+                allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.consumed));
         }
         return allConsumed;
     }
@@ -311,7 +312,7 @@ public class SubscriptionState {
     private static class TopicPartitionState {
         private Long consumed;   // offset exposed to the user
         private Long fetched;    // current fetch position
-        private Long committed;  // last committed position
+        private OffsetAndMetadata committed;  // last committed position
 
         private boolean hasValidPosition; // whether we have valid consumed and fetched positions
         private boolean paused;  // whether this partition has been paused by the user
@@ -356,7 +357,7 @@ public class SubscriptionState {
             this.consumed = offset;
         }
 
-        private void committed(Long offset) {
+        private void committed(OffsetAndMetadata offset) {
             this.committed = offset;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index d6e6386..03df1e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -12,12 +12,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -26,6 +20,12 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 24e3d81..f702535 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -45,7 +45,7 @@ public class MockConsumerTest {
         assertFalse(iter.hasNext());
         assertEquals(1L, consumer.position(new TopicPartition("test", 0)));
         consumer.commitSync();
-        assertEquals(1L, consumer.committed(new TopicPartition("test", 0)));
+        assertEquals(1L, consumer.committed(new TopicPartition("test", 0)).offset());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 490578e..82586ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
@@ -31,6 +32,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -302,16 +304,38 @@ public class CoordinatorTest {
     }
 
     @Test
-    public void testCommitOffsetNormal() {
+    public void testCommitOffsetOnly() {
+        subscriptions.assign(Arrays.asList(tp));
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+        consumerClient.poll(0);
+        assertTrue(success.get());
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testCommitOffsetMetadata() {
+        subscriptions.assign(Arrays.asList(tp));
+
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
         AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), callback(success));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
         consumerClient.poll(0);
         assertTrue(success.get());
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+        assertEquals("hello", subscriptions.committed(tp).metadata());
     }
 
     @Test
@@ -320,7 +344,7 @@ public class CoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), null);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
         consumerClient.poll(0);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertNull(defaultOffsetCommitCallback.exception);
@@ -332,7 +356,7 @@ public class CoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), null);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
         consumerClient.poll(0);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
@@ -346,7 +370,7 @@ public class CoordinatorTest {
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), cb);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
         consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -362,7 +386,7 @@ public class CoordinatorTest {
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), cb);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
         consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -378,7 +402,7 @@ public class CoordinatorTest {
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, 100L), cb);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
         consumerClient.poll(0);
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -395,7 +419,7 @@ public class CoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
     @Test
@@ -407,7 +431,7 @@ public class CoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
     @Test
@@ -419,7 +443,17 @@ public class CoordinatorTest {
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+    }
+
+    @Test(expected = OffsetMetadataTooLarge.class)
+    public void testCommitOffsetMetadataTooLarge() {
+        // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
     }
 
     @Test(expected = ApiException.class)
@@ -429,7 +463,7 @@ public class CoordinatorTest {
 
         // sync commit with invalid partitions should throw if we have no callback
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, 100L));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
     @Test
@@ -442,7 +476,7 @@ public class CoordinatorTest {
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, (long) subscriptions.committed(tp));
+        assertEquals(100L, subscriptions.committed(tp).offset());
     }
 
     @Test
@@ -456,7 +490,7 @@ public class CoordinatorTest {
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, (long) subscriptions.committed(tp));
+        assertEquals(100L, subscriptions.committed(tp).offset());
     }
 
     @Test
@@ -471,7 +505,7 @@ public class CoordinatorTest {
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, (long) subscriptions.committed(tp));
+        assertEquals(100L, subscriptions.committed(tp).offset());
     }
 
     @Test
@@ -516,7 +550,7 @@ public class CoordinatorTest {
     private OffsetCommitCallback callback(final AtomicBoolean success) {
         return new OffsetCommitCallback() {
             @Override
-            public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                 if (exception == null)
                     success.set(true);
             }
@@ -528,7 +562,7 @@ public class CoordinatorTest {
         public Exception exception = null;
 
         @Override
-        public void onComplete(Map<TopicPartition, Long> offsets, Exception exception) {
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
             invoked++;
             this.exception = exception;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a4f6c19..17ec2ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -227,7 +228,7 @@ public class FetcherTest {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present
         subscriptions.assign(Arrays.asList(tp));
-        subscriptions.committed(tp, 5);
+        subscriptions.committed(tp, new OffsetAndMetadata(5));
 
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertTrue(subscriptions.isFetchable(tp));

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index a279faa..a0568ad 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.regex.Pattern;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
@@ -42,7 +43,7 @@ public class SubscriptionStateTest {
     public void partitionAssignment() {
         state.assign(Arrays.asList(tp0));
         assertEquals(Collections.singleton(tp0), state.assignedPartitions());
-        state.committed(tp0, 1);
+        state.committed(tp0, new OffsetAndMetadata(1));
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertAllPositions(tp0, 1L);
@@ -78,7 +79,7 @@ public class SubscriptionStateTest {
         assertTrue(state.partitionsAutoAssigned());
         state.changePartitionAssignment(asList(tp0));
         state.seek(tp0, 1);
-        state.committed(tp0, 1);
+        state.committed(tp0, new OffsetAndMetadata(1));
         assertAllPositions(tp0, 1L);
         state.changePartitionAssignment(asList(tp1));
         assertTrue(state.isAssigned(tp1));
@@ -99,6 +100,15 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void commitOffsetMetadata() {
+        state.assign(Arrays.asList(tp0));
+        state.committed(tp0, new OffsetAndMetadata(5, "hi"));
+
+        assertEquals(5, state.committed(tp0).offset());
+        assertEquals("hi", state.committed(tp0).metadata());
+    }
+
+    @Test
     public void topicUnsubscription() {
         final String topic = "test";
         state.subscribe(Arrays.asList(topic), rebalanceListener);
@@ -106,7 +116,7 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
         state.changePartitionAssignment(asList(tp0));
-        state.committed(tp0, 1);
+        state.committed(tp0, new OffsetAndMetadata(1));
         state.seek(tp0, 1);
         assertAllPositions(tp0, 1L);
         state.changePartitionAssignment(asList(tp1));
@@ -143,7 +153,7 @@ public class SubscriptionStateTest {
     }
     
     public void assertAllPositions(TopicPartition tp, Long offset) {
-        assertEquals(offset, state.committed(tp));
+        assertEquals(offset.longValue(), state.committed(tp).offset());
         assertEquals(offset, state.fetched(tp));
         assertEquals(offset, state.consumed(tp));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index 49fbbe9..bf5d152 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -16,6 +16,7 @@
  **/
 package org.apache.kafka.copycat.sink;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.connector.Task;
@@ -60,5 +61,5 @@ public abstract class SinkTask implements Task {
      *
      * @param offsets mapping of TopicPartition to committed offset
      */
-    public abstract void flush(Map<TopicPartition, Long> offsets);
+    public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 870a952..9ea459c 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.file;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.sink.SinkRecord;
@@ -69,7 +70,7 @@ public class FileStreamSinkTask extends SinkTask {
     }
 
     @Override
-    public void flush(Map<TopicPartition, Long> offsets) {
+    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
         outputStream.flush();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
index 1dfb5d8..ac8b5f1 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.file;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.sink.SinkRecord;
@@ -45,14 +46,14 @@ public class FileStreamSinkTaskTest {
 
     @Test
     public void testPutFlush() {
-        HashMap<TopicPartition, Long> offsets = new HashMap<>();
+        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
         // We do not call task.start() since it would override the output stream
 
         task.put(Arrays.asList(
                 new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
         ));
-        offsets.put(new TopicPartition("topic1", 0), 1L);
+        offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
         task.flush(offsets);
         assertEquals("line1\n", os.toString());
 
@@ -60,8 +61,8 @@ public class FileStreamSinkTaskTest {
                 new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
                 new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
         ));
-        offsets.put(new TopicPartition("topic1", 0), 2L);
-        offsets.put(new TopicPartition("topic2", 0), 1L);
+        offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
+        offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
         task.flush(offsets);
         assertEquals("line1\nline2\nline3\n", os.toString());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 1047213..cbda201 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -122,9 +122,9 @@ class WorkerSinkTask implements WorkerTask {
      **/
     public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
         log.info("{} Committing offsets", this);
-        HashMap<TopicPartition, Long> offsets = new HashMap<>();
+        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition tp : consumer.assignment()) {
-            offsets.put(tp, consumer.position(tp));
+            offsets.put(tp, new OffsetAndMetadata(consumer.position(tp)));
         }
         // We only don't flush the task in one case: when shutting down, the task has already been
         // stopped and all data should have already been flushed
@@ -147,7 +147,7 @@ class WorkerSinkTask implements WorkerTask {
         } else {
             OffsetCommitCallback cb = new OffsetCommitCallback() {
                 @Override
-                public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
+                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
                     workThread.onCommitCompleted(error, seqno);
                 }
             };

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 91469d3..687ed8f 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -346,7 +346,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
                 }
         );
 
-        sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset));
+        sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)));
         IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
         if (flushError != null) {
             flushExpectation.andThrow(flushError).once();
@@ -354,7 +354,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         }
 
         final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
-        final Map<TopicPartition, Long> offsets = Collections.singletonMap(TOPIC_PARTITION, finalOffset);
+        final Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
         consumer.commitAsync(EasyMock.eq(offsets),
                 EasyMock.capture(capturedCallback));
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 28a3e90..3f17e79 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -87,7 +87,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       }
 
       consumer.commitSync()
-      assertEquals(consumer.position(tp), consumer.committed(tp))
+      assertEquals(consumer.position(tp), consumer.committed(tp).offset)
 
       if (consumer.position(tp) == numRecords) {
         consumer.seekToBeginning()
@@ -131,7 +131,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       } else if (coin == 2) {
         info("Committing offset.")
         consumer.commitSync()
-        assertEquals(consumer.position(tp), consumer.committed(tp))
+        assertEquals(consumer.position(tp), consumer.committed(tp).offset)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 1b1973f..c59b95b 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -186,23 +186,23 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     this.consumers(0).poll(50)
     val pos1 = this.consumers(0).position(tp)
     val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commitSync(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava)
-    assertEquals(3, this.consumers(0).committed(tp))
+    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
+    assertEquals(3, this.consumers(0).committed(tp).offset)
     intercept[NoOffsetForPartitionException] {
       this.consumers(0).committed(tp2)
     }
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commitSync(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava)
-    assertEquals(3, this.consumers(0).committed(tp))
-    assertEquals(5, this.consumers(0).committed(tp2))
+    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
+    assertEquals(3, this.consumers(0).committed(tp).offset)
+    assertEquals(5, this.consumers(0).committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, commitCallback)
+    this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
     awaitCommitCallback(this.consumers(0), commitCallback)
-    assertEquals(7, this.consumers(0).committed(tp2))
+    assertEquals(7, this.consumers(0).committed(tp2).offset)
   }
 
   @Test
@@ -240,7 +240,25 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
+
   @Test
+  def testCommitMetadata() {
+    this.consumers(0).assign(List(tp))
+
+    // sync commit
+    val syncMetadata = new OffsetAndMetadata(5, "foo")
+    this.consumers(0).commitSync(Map((tp, syncMetadata)))
+    assertEquals(syncMetadata, this.consumers(0).committed(tp))
+
+    // async commit
+    val asyncMetadata = new OffsetAndMetadata(10, "bar")
+    val callback = new CountConsumerCommitCallback
+    this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback)
+    awaitCommitCallback(this.consumers(0), callback)
+
+    assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+  }
+
   def testPositionAndCommit() {
     sendRecords(5)
 
@@ -258,12 +276,12 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
 
     assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
     this.consumers(0).commitSync()
-    assertEquals(0L, this.consumers(0).committed(tp))
+    assertEquals(0L, this.consumers(0).committed(tp).offset)
 
     consumeRecords(this.consumers(0), 5, 0)
     assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
     this.consumers(0).commitSync()
-    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
+    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
 
     sendRecords(1)
 
@@ -473,14 +491,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val startCount = commitCallback.count
     val started = System.currentTimeMillis()
     while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
-      this.consumers(0).poll(10000)
+      this.consumers(0).poll(50)
     assertEquals(startCount + 1, commitCallback.count)
   }
 
   private class CountConsumerCommitCallback extends OffsetCommitCallback {
     var count = 0
 
-    override def onComplete(offsets: util.Map[TopicPartition, lang.Long], exception: Exception): Unit = count += 1
+    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1dd1e90/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
index df9b3d8..65e3d71 100644
--- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -183,12 +183,12 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
 
     assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
     this.consumers(0).commitSync()
-    assertEquals(0L, this.consumers(0).committed(tp))
+    assertEquals(0L, this.consumers(0).committed(tp).offset)
 
     consumeRecords(this.consumers(0), 5, 0)
     assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
     this.consumers(0).commitSync()
-    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
+    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
 
     sendRecords(1)
 


Mime
View raw message