kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3850: WorkerSinkTask commit prior to rebalance should be retried on wakeup
Date Sun, 19 Jun 2016 22:32:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 c179dee9d -> 968e44cac


KAFKA-3850: WorkerSinkTask commit prior to rebalance should be retried on wakeup

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1511 from hachikuji/retry-commit-on-wakeup-in-sinks

(cherry picked from commit 2c9796114d0a9638be79b4165d0096c7a63babe7)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/968e44ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/968e44ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/968e44ca

Branch: refs/heads/0.10.0
Commit: 968e44cac5919b28ebf5fad14a7946e4db943dc0
Parents: c179dee
Author: Jason Gustafson <jason@confluent.io>
Authored: Sun Jun 19 15:31:04 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Sun Jun 19 15:31:14 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSinkTask.java   | 27 +++++++---
 .../connect/runtime/WorkerSinkTaskTest.java     | 54 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/968e44ca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index f5eaac4..1aef3bb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -228,6 +228,9 @@ class WorkerSinkTask extends WorkerTask {
         } catch (WakeupException we) {
             log.trace("{} consumer woken up", id);
 
+            if (isStopping())
+                return;
+
             if (shouldPause()) {
                 pauseAll();
             } else if (!pausedForRedelivery) {
@@ -236,6 +239,20 @@ class WorkerSinkTask extends WorkerTask {
         }
     }
 
+    private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno)
{
+        try {
+            consumer.commitSync(offsets);
+            lastCommittedOffsets = offsets;
+            onCommitCompleted(null, seqno);
+        } catch (WakeupException e) {
+            // retry the commit to ensure offsets get pushed, then propagate the wakeup up
to poll
+            doCommitSync(offsets, seqno);
+            throw e;
+        } catch (KafkaException e) {
+            onCommitCompleted(e, seqno);
+        }
+    }
+
     /**
      * Starts an offset commit by flushing outstanding messages from the task and then starting
      * the write commit.
@@ -243,13 +260,7 @@ class WorkerSinkTask extends WorkerTask {
     private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing,
final int seqno) {
         log.info("{} Committing offsets", this);
         if (closing) {
-            try {
-                consumer.commitSync(offsets);
-                lastCommittedOffsets = offsets;
-                onCommitCompleted(null, seqno);
-            } catch (KafkaException e) {
-                onCommitCompleted(e, seqno);
-            }
+            doCommitSync(offsets, seqno);
         } else {
             OffsetCommitCallback cb = new OffsetCommitCallback() {
                 @Override
@@ -448,7 +459,7 @@ class WorkerSinkTask extends WorkerTask {
             // Instead of invoking the assignment callback on initialization, we guarantee
the consumer is ready upon
             // task start. Since this callback gets invoked during that initial setup before
we've started the task, we
             // need to guard against invoking the user's callback method during that period.
-            if (rebalanceException == null) {
+            if (rebalanceException == null || rebalanceException instanceof WakeupException)
{
                 try {
                     openPartitions(partitions);
                 } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/968e44ca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 835e30f..6a14074 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -293,6 +293,60 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testWakeupInCommitSyncCausesRetry() throws Exception {
+        expectInitializeTask();
+        expectPollInitialAssignment();
+
+        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+
+        sinkTask.close(new HashSet<>(partitions));
+        EasyMock.expectLastCall();
+
+        sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall();
+
+        // first one raises wakeup
+        consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall().andThrow(new WakeupException());
+
+        // we should retry and complete the commit
+        consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+
+        sinkTask.open(partitions);
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
+                        rebalanceListener.getValue().onPartitionsRevoked(partitions);
+                        rebalanceListener.getValue().onPartitionsAssigned(partitions);
+                        return ConsumerRecords.empty();
+                    }
+                });
+
+        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(partitions));
+
+        consumer.resume(Collections.singleton(TOPIC_PARTITION));
+        EasyMock.expectLastCall();
+
+        consumer.resume(Collections.singleton(TOPIC_PARTITION2));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.poll(Long.MAX_VALUE); // poll for initial assignment
+        workerTask.poll(Long.MAX_VALUE); // now rebalance with the wakeup triggered
+
+        PowerMock.verifyAll();
+    }
 
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);


Mime
View raw message