kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5731; Corrected how the sink task worker updates the last committed offsets
Date Tue, 15 Aug 2017 21:17:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 869ef5579 -> 3b1cea60e


KAFKA-5731; Corrected how the sink task worker updates the last committed offsets

Prior to this change, it was possible for the synchronous consumer commit request to be handled
before previously-submitted asynchronous commit requests. If that happened, the out-of-order
handlers improperly set the last committed offsets, which then became inconsistent with the
offsets the connector task is working with.

This change ensures that the last committed offsets are updated only for the most recent commit
request, even if the consumer reorders the calls to the callbacks.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3662 from rhauch/kafka-5731


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b1cea60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b1cea60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b1cea60

Branch: refs/heads/trunk
Commit: 3b1cea60e909e579e54775fc479397ddbeba95eb
Parents: 869ef55
Author: Randall Hauch <rhauch@gmail.com>
Authored: Tue Aug 15 14:16:00 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Aug 15 14:16:00 2017 -0700

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   4 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   | 102 ++++++----
 .../connect/runtime/WorkerSinkTaskTest.java     | 200 ++++++++++++++++++-
 3 files changed, 262 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b1cea60/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cca1429..7f3c4b6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -48,7 +48,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
 
     <suppress checks="JavaNCSS"
-              files="AbstractRequest.java|KerberosLogin.java"/>
+              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
 
     <suppress checks="JavaNCSS"
               files="AbstractRequest.java"/>
@@ -71,7 +71,7 @@
               files="DistributedHerder(|Test).java"/>
 
     <suppress checks="MethodLength"
-              files="(KafkaConfigBackingStore|RequestResponseTest).java"/>
+              files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
     <suppress checks="ParameterNumber"
               files="WorkerSourceTask.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b1cea60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 9295712..0c8fcf6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -179,7 +179,7 @@ class WorkerSinkTask extends WorkerTask {
             long timeoutMs = Math.max(nextCommit - now, 0);
             poll(timeoutMs);
         } catch (WakeupException we) {
-            log.trace("{} consumer woken up", this);
+            log.trace("{} Consumer woken up", this);
 
             if (isStopping())
                 return;
@@ -195,16 +195,31 @@ class WorkerSinkTask extends WorkerTask {
         }
     }
 
-    private void onCommitCompleted(Throwable error, long seqno) {
+    /**
+     * Respond to a previous commit attempt that may or may not have succeeded. Note that
due to our use of async commits,
+     * these invocations may come out of order and thus the need for the commit sequence
number.
+     *
+     * @param error            the error resulting from the commit, or null if the commit
succeeded without error
+     * @param seqno            the sequence number at the time the commit was requested
+     * @param committedOffsets the offsets that were committed; may be null if the commit
did not complete successfully
+     *                         or if no new offsets were committed
+     */
+    private void onCommitCompleted(Throwable error, long seqno, Map<TopicPartition, OffsetAndMetadata>
committedOffsets) {
         if (commitSeqno != seqno) {
-            log.debug("{} Got callback for timed out commit: {}, but most recent commit is
{}", this, seqno, commitSeqno);
+            log.debug("{} Received out of order commit callback for sequence number {}, but
most recent sequence number is {}",
+                    this, seqno, commitSeqno);
         } else {
             if (error != null) {
-                log.error("{} Commit of offsets threw an unexpected exception: ", this, error);
+                log.error("{} Commit of offsets threw an unexpected exception for sequence
number {}: {}",
+                        this, seqno, committedOffsets, error);
                 commitFailures++;
             } else {
-                log.debug("{} Finished offset commit successfully in {} ms",
-                        this, time.milliseconds() - commitStarted);
+                log.debug("{} Finished offset commit successfully in {} ms for sequence number
{}: {}",
+                        this, time.milliseconds() - commitStarted, seqno, committedOffsets);
+                if (committedOffsets != null) {
+                    log.debug("{} Setting last committed offsets to {}", this, committedOffsets);
+                    lastCommittedOffsets = committedOffsets;
+                }
                 commitFailures = 0;
             }
             committing = false;
@@ -219,13 +234,12 @@ class WorkerSinkTask extends WorkerTask {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
-        log.debug("{} Initializing task", this);
         String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
         if (topicsStr == null || topicsStr.isEmpty())
             throw new ConnectException("Sink tasks require a list of topics.");
         String[] topics = topicsStr.split(",");
-        log.debug("{} Task subscribing to topics {}", this, topics);
         consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+        log.debug("{} Initializing and starting task for topics {}", this, topics);
         task.initialize(context);
         task.start(taskConfig);
         log.info("{} Sink task finished initialization and start", this);
@@ -240,10 +254,10 @@ class WorkerSinkTask extends WorkerTask {
             context.timeout(-1L);
         }
 
-        log.trace("{} polling consumer with timeout {} ms", this, timeoutMs);
+        log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
         ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
         assert messageBatch.isEmpty() || msgs.isEmpty();
-        log.trace("{} polling returned {} messages", this, msgs.count());
+        log.trace("{} Polling returned {} messages", this, msgs.count());
 
         convertMessages(msgs);
         deliverMessages();
@@ -255,38 +269,39 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno)
{
+        log.info("{} Committing offsets synchronously using sequence number {}: {}", this,
seqno, offsets);
         try {
             consumer.commitSync(offsets);
-            lastCommittedOffsets = offsets;
-            onCommitCompleted(null, seqno);
+            onCommitCompleted(null, seqno, offsets);
         } catch (WakeupException e) {
             // retry the commit to ensure offsets get pushed, then propagate the wakeup up
to poll
             doCommitSync(offsets, seqno);
             throw e;
         } catch (KafkaException e) {
-            onCommitCompleted(e, seqno);
+            onCommitCompleted(e, seqno, offsets);
         }
     }
 
+    private void doCommitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, final
int seqno) {
+        log.info("{} Committing offsets asynchronously using sequence number {}: {}", this,
seqno, offsets);
+        OffsetCommitCallback cb = new OffsetCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception error) {
+                onCommitCompleted(error, seqno, offsets);
+            }
+        };
+        consumer.commitAsync(offsets, cb);
+    }
+
     /**
      * Starts an offset commit by flushing outstanding messages from the task and then starting
      * the write commit.
      **/
-    private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing,
final int seqno) {
-        log.info("{} Committing offsets", this);
+    private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing,
int seqno) {
         if (closing) {
             doCommitSync(offsets, seqno);
         } else {
-            OffsetCommitCallback cb = new OffsetCommitCallback() {
-                @Override
-                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception error) {
-                    if (error == null) {
-                        lastCommittedOffsets = offsets;
-                    }
-                    onCommitCompleted(error, seqno);
-                }
-            };
-            consumer.commitAsync(offsets, cb);
+            doCommitAsync(offsets, seqno);
         }
     }
 
@@ -300,11 +315,12 @@ class WorkerSinkTask extends WorkerTask {
 
         final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
         try {
+            log.trace("{} Calling task.preCommit with current offsets: {}", this, currentOffsets);
             taskProvidedOffsets = task.preCommit(new HashMap<>(currentOffsets));
         } catch (Throwable t) {
             if (closing) {
                 log.warn("{} Offset commit failed during close", this);
-                onCommitCompleted(t, commitSeqno);
+                onCommitCompleted(t, commitSeqno, null);
             } else {
                 log.error("{} Offset commit failed, rewinding to last committed offsets",
this, t);
                 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet())
{
@@ -312,18 +328,19 @@ class WorkerSinkTask extends WorkerTask {
                     consumer.seek(entry.getKey(), entry.getValue().offset());
                 }
                 currentOffsets = new HashMap<>(lastCommittedOffsets);
-                onCommitCompleted(t, commitSeqno);
+                onCommitCompleted(t, commitSeqno, null);
             }
             return;
         } finally {
-            // Close the task if needed before committing the offsets.
-            if (closing)
+            if (closing) {
+                log.trace("{} Closing the task before committing the offsets: {}", this,
currentOffsets);
                 task.close(currentOffsets.keySet());
+            }
         }
 
         if (taskProvidedOffsets.isEmpty()) {
-            log.debug("{} Skipping offset commit, task opted-out", this);
-            onCommitCompleted(null, commitSeqno);
+            log.debug("{} Skipping offset commit, task opted-out by returning no offsets
from preCommit", this);
+            onCommitCompleted(null, commitSeqno, null);
             return;
         }
 
@@ -348,11 +365,10 @@ class WorkerSinkTask extends WorkerTask {
 
         if (commitableOffsets.equals(lastCommittedOffsets)) {
             log.debug("{} Skipping offset commit, no change since last commit", this);
-            onCommitCompleted(null, commitSeqno);
+            onCommitCompleted(null, commitSeqno, null);
             return;
         }
 
-        log.trace("{} Offsets to commit: {}", this, commitableOffsets);
         doCommit(commitableOffsets, closing, commitSeqno);
     }
 
@@ -404,18 +420,25 @@ class WorkerSinkTask extends WorkerTask {
 
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
-            log.trace("{} Consuming message with key {}, value {}", this, msg.key(), msg.value());
+            log.trace("{} Consuming and converting message in topic '{}' partition {} at
offset {} and timestamp {}",
+                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
             SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
             SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
+            Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
             SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
                     keyAndSchema.schema(), keyAndSchema.value(),
                     valueAndSchema.schema(), valueAndSchema.value(),
                     msg.offset(),
-                    ConnectUtils.checkAndConvertTimestamp(msg.timestamp()),
+                    timestamp,
                     msg.timestampType());
+            log.trace("{} Applying transformations to record in topic '{}' partition {} at
offset {} and timestamp {} with key {} and value {}",
+                    this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(),
valueAndSchema.value());
             record = transformationChain.apply(record);
             if (record != null) {
                 messageBatch.add(record);
+            } else {
+                log.trace("{} Transformations returned null, so dropping record in topic
'{}' partition {} at offset {} and timestamp {} with key {} and value {}",
+                        this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(),
valueAndSchema.value());
             }
         }
     }
@@ -434,6 +457,7 @@ class WorkerSinkTask extends WorkerTask {
         // Finally, deliver this batch to the sink
         try {
             // Since we reuse the messageBatch buffer, ensure we give the task its own copy
+            log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
             task.put(new ArrayList<>(messageBatch));
             for (SinkRecord record : messageBatch)
                 currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
@@ -469,12 +493,12 @@ class WorkerSinkTask extends WorkerTask {
             TopicPartition tp = entry.getKey();
             Long offset = entry.getValue();
             if (offset != null) {
-                log.trace("{} Rewind {} to offset {}.", this, tp, offset);
+                log.trace("{} Rewind {} to offset {}", this, tp, offset);
                 consumer.seek(tp, offset);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
                 currentOffsets.put(tp, new OffsetAndMetadata(offset));
             } else {
-                log.warn("{} Cannot rewind {} to null offset.", this, tp);
+                log.warn("{} Cannot rewind {} to null offset", this, tp);
             }
         }
         context.clearOffsets();
@@ -491,13 +515,14 @@ class WorkerSinkTask extends WorkerTask {
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            log.debug("{} Partitions assigned", WorkerSinkTask.this);
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
                 long pos = consumer.position(tp);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                 currentOffsets.put(tp, new OffsetAndMetadata(pos));
-                log.debug("{} assigned topic partition {} with offset {}", this, tp, pos);
+                log.debug("{} Assigned topic partition {} with offset {}", this, tp, pos);
             }
 
             // If we paused everything for redelivery (which is no longer relevant since
we discarded the data), make
@@ -530,6 +555,7 @@ class WorkerSinkTask extends WorkerTask {
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            log.debug("{} Partitions revoked", WorkerSinkTask.this);
             try {
                 closePartitions();
             } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b1cea60/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 50b7503..3ab6e06 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
@@ -82,6 +84,7 @@ public class WorkerSinkTaskTest {
     private static final String TOPIC = "test";
     private static final int PARTITION = 12;
     private static final int PARTITION2 = 13;
+    private static final int PARTITION3 = 14;
     private static final long FIRST_OFFSET = 45;
     private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
     private static final int KEY = 12;
@@ -92,6 +95,7 @@ public class WorkerSinkTaskTest {
 
     private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
     private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
+    private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
 
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
     static {
@@ -122,7 +126,8 @@ public class WorkerSinkTaskTest {
     private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
 
-    private long recordsReturned;
+    private long recordsReturnedTp1;
+    private long recordsReturnedTp3;
 
     @Before
     public void setUp() {
@@ -141,7 +146,8 @@ public class WorkerSinkTaskTest {
                 WorkerSinkTask.class, new String[]{"createConsumer"},
                 taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter,
valueConverter, transformationChain, pluginLoader, time);
 
-        recordsReturned = 0;
+        recordsReturnedTp1 = 0;
+        recordsReturnedTp3 = 0;
     }
 
     @Test
@@ -636,6 +642,189 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    // Verify that when commitAsync is called but the supplied callback is not called by
the consumer before a
+    // rebalance occurs, the async callback does not reset the last committed offset from
the rebalance.
+    // See KAFKA-5731 for more information.
+    @Test
+    public void testCommitWithOutOfOrderCallback() throws Exception {
+        expectInitializeTask();
+
+        // iter 1
+        expectPollInitialAssignment();
+
+        // iter 2
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(4);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+        workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+        workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+        workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final List<TopicPartition> originalPartitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+        final List<TopicPartition> rebalancedPartitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2,
TOPIC_PARTITION3);
+        final Map<TopicPartition, OffsetAndMetadata> rebalanceOffsets = new HashMap<>();
+        rebalanceOffsets.put(TOPIC_PARTITION, workerCurrentOffsets.get(TOPIC_PARTITION));
+        rebalanceOffsets.put(TOPIC_PARTITION2, workerCurrentOffsets.get(TOPIC_PARTITION2));
+        rebalanceOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final Map<TopicPartition, OffsetAndMetadata> postRebalanceCurrentOffsets =
new HashMap<>();
+        postRebalanceCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET
+ 3));
+        postRebalanceCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        postRebalanceCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET
+ 2));
+
+        // iter 3 - note that we return the current offset to indicate they should be committed
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
+
+        // We need to delay the result of trying to commit offsets to Kafka via the consumer.commitAsync
+        // method. We do this so that we can test that the callback is not called until after
the rebalance
+        // changes the lastCommittedOffsets. To fake this for tests we have the commitAsync
build a function
+        // that will call the callback with the appropriate parameters, and we'll run that
function later.
+        final AtomicReference<Runnable> asyncCallbackRunner = new AtomicReference<>();
+        final AtomicBoolean asyncCallbackRan = new AtomicBoolean();
+
+        consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.<OffsetCommitCallback>anyObject());
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public Void answer() throws Throwable {
+                // Grab the arguments passed to the consumer.commitAsync method
+                final Object[] args = EasyMock.getCurrentArguments();
+                final Map<TopicPartition, OffsetAndMetadata> offsets = (Map<TopicPartition,
OffsetAndMetadata>) args[0];
+                final OffsetCommitCallback callback = (OffsetCommitCallback) args[1];
+                asyncCallbackRunner.set(new Runnable() {
+                    @Override
+                    public void run() {
+                        callback.onComplete(offsets, null);
+                        asyncCallbackRan.set(true);
+                    }
+                });
+                return null;
+            }
+        });
+
+        // Expect the next poll to discover and perform the rebalance, THEN complete the
previous callback handler,
+        // and then return one record for TP1 and one for TP3.
+        final AtomicBoolean rebalanced = new AtomicBoolean();
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
+                        // Rebalance always begins with revoking current partitions ...
+                        rebalanceListener.getValue().onPartitionsRevoked(originalPartitions);
+                        // Respond to the rebalance
+                        Map<TopicPartition, Long> offsets = new HashMap<>();
+                        offsets.put(TOPIC_PARTITION, rebalanceOffsets.get(TOPIC_PARTITION).offset());
+                        offsets.put(TOPIC_PARTITION2, rebalanceOffsets.get(TOPIC_PARTITION2).offset());
+                        offsets.put(TOPIC_PARTITION3, rebalanceOffsets.get(TOPIC_PARTITION3).offset());
+                        sinkTaskContext.getValue().offset(offsets);
+                        rebalanceListener.getValue().onPartitionsAssigned(rebalancedPartitions);
+                        rebalanced.set(true);
+
+                        // Run the previous async commit handler
+                        asyncCallbackRunner.get().run();
+
+                         // And prep the two records to return
+                        long timestamp = RecordBatch.NO_TIMESTAMP;
+                        TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
+                        List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+                        records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET
+ recordsReturnedTp1 + 1, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+                        records.add(new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET
+ recordsReturnedTp3 + 1, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+                        recordsReturnedTp1 += 1;
+                        recordsReturnedTp3 += 1;
+                        return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC,
PARTITION), records));
+                    }
+                });
+
+        // onPartitionsRevoked
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+        sinkTask.close(workerCurrentOffsets.keySet());
+        EasyMock.expectLastCall();
+        consumer.commitSync(workerCurrentOffsets);
+        EasyMock.expectLastCall();
+
+        // onPartitionsAssigned - step 1
+        final long offsetTp1 = rebalanceOffsets.get(TOPIC_PARTITION).offset();
+        final long offsetTp2 = rebalanceOffsets.get(TOPIC_PARTITION2).offset();
+        final long offsetTp3 = rebalanceOffsets.get(TOPIC_PARTITION3).offset();
+        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(offsetTp1);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(offsetTp2);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(offsetTp3);
+
+        // onPartitionsAssigned - step 2
+        sinkTask.open(rebalancedPartitions);
+        EasyMock.expectLastCall();
+
+        // onPartitionsAssigned - step 3 rewind
+        consumer.seek(TOPIC_PARTITION, offsetTp1);
+        EasyMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION2, offsetTp2);
+        EasyMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION3, offsetTp3);
+        EasyMock.expectLastCall();
+
+        // iter 4 - note that we return the current offset to indicate they should be committed
+        sinkTask.preCommit(postRebalanceCurrentOffsets);
+        EasyMock.expectLastCall().andReturn(postRebalanceCurrentOffsets);
+
+        final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
+        consumer.commitAsync(EasyMock.eq(postRebalanceCurrentOffsets), EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                callback.getValue().onComplete(postRebalanceCurrentOffsets, null);
+                return null;
+            }
+        });
+
+        // no actual consumer.commit() triggered
+        expectConsumerPoll(1);
+
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration(); // iter 1 -- initial assignment
+
+        assertEquals(workerStartingOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
+        assertEquals(workerStartingOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
+
+        time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
+        workerTask.iteration(); // iter 2 -- deliver 2 records
+
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration(); // iter 3 -- commit in progress
+
+        assertTrue(asyncCallbackRan.get());
+        assertTrue(rebalanced.get());
+
+        // Check that the offsets were not reset by the out-of-order async commit callback
+        assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
+        assertEquals(rebalanceOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
+
+        time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration(); // iter 4 -- commit in progress
+
+        // Check that the offsets were not reset by the out-of-order async commit callback
+        assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
+        assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();
@@ -705,6 +894,9 @@ public class WorkerSinkTaskTest {
         sinkTask.close(new HashSet<>(partitions));
         EasyMock.expectLastCall().andThrow(e);
 
+        sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall().andReturn(Collections.emptyMap());
+
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
@@ -778,8 +970,8 @@ public class WorkerSinkTaskTest {
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
                         List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
                         for (int i = 0; i < numMessages; i++)
-                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET
+ recordsReturned + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
-                        recordsReturned += numMessages;
+                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET
+ recordsReturnedTp1 + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+                        recordsReturnedTp1 += numMessages;
                         return new ConsumerRecords<>(
                                 numMessages > 0 ?
                                         Collections.singletonMap(new TopicPartition(TOPIC,
PARTITION), records) :


Mime
View raw message