kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2886: Handle sink task rebalance failures by stopping worker task
Date Fri, 15 Jan 2016 17:29:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 37be6d98d -> 6dc974312


KAFKA-2886: Handle sink task rebalance failures by stopping worker task

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #767 from hachikuji/KAFKA-2886


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6dc97431
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6dc97431
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6dc97431

Branch: refs/heads/trunk
Commit: 6dc974312590f089c9e0ac27e8eb9b848caf32b5
Parents: 37be6d9
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Jan 15 09:28:43 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jan 15 09:28:43 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/connect/sink/SinkTask.java |  7 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   | 44 +++++++--
 .../connect/runtime/WorkerSinkTaskThread.java   | 34 ++++---
 .../connect/runtime/WorkerSinkTaskTest.java     | 94 ++++++++++++++++++++
 4 files changed, 151 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 7e793c8..85ce88a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -80,7 +80,7 @@ public abstract class SinkTask implements Task {
      * re-assignment. In partition re-assignment, some new partitions may be assigned to
the SinkTask.
      * The SinkTask needs to create writers and perform necessary recovery for the newly
assigned partitions.
      * This method will be called after partition re-assignment completes and before the
SinkTask starts
-     * fetching data.
+     * fetching data. Note that any errors raised from this method will cause the task to
stop.
      * @param partitions The list of partitions that are now assigned to the task (may include
      *                   partitions previously assigned to the task)
      */
@@ -88,9 +88,10 @@ public abstract class SinkTask implements Task {
     }
 
     /**
-     * The SinkTask use this method to close writers and commit offsets for partitions that
are
+     * The SinkTask use this method to close writers and commit offsets for partitions that
are no
      * longer assigned to the SinkTask. This method will be called before a rebalance operation
starts
-     * and after the SinkTask stops fetching data.
+     * and after the SinkTask stops fetching data. Note that any errors raised from this
method will cause
+     * the task to stop.
      * @param partitions The list of partitions that were assigned to the consumer on the
last
      *                   rebalance
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a67d0af..f48a734 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -71,6 +71,7 @@ class WorkerSinkTask implements WorkerTask {
     private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
     private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
     private boolean pausedForRedelivery;
+    private RuntimeException rebalanceException;
 
     public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
                           Converter keyConverter, Converter valueConverter, Time time) {
@@ -84,6 +85,7 @@ class WorkerSinkTask implements WorkerTask {
         this.messageBatch = new ArrayList<>();
         this.currentOffsets = new HashMap<>();
         this.pausedForRedelivery = false;
+        this.rebalanceException = null;
     }
 
     @Override
@@ -145,7 +147,7 @@ class WorkerSinkTask implements WorkerTask {
         // Ensure we're in the group so that if start() wants to rewind offsets, it will
have an assignment of partitions
         // to work with. Any rewinding will be handled immediately when polling starts.
         try {
-            consumer.poll(0);
+            pollConsumer(0);
         } catch (WakeupException e) {
             log.error("Sink task {} was stopped before completing join group. Task initialization
and start is being skipped", this);
             return false;
@@ -168,7 +170,7 @@ class WorkerSinkTask implements WorkerTask {
             }
 
             log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
-            ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+            ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
             assert messageBatch.isEmpty() || msgs.isEmpty();
             log.trace("{} polling returned {} messages", id, msgs.count());
 
@@ -237,6 +239,19 @@ class WorkerSinkTask implements WorkerTask {
                 '}';
     }
 
+    private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
+        ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+
+        // Exceptions raised from the task during a rebalance should be rethrown to stop
the worker
+        if (rebalanceException != null) {
+            RuntimeException e = rebalanceException;
+            rebalanceException = null;
+            throw e;
+        }
+
+        return msgs;
+    }
+
     private KafkaConsumer<byte[], byte[]> createConsumer() {
         // Include any unknown worker configs so consumer configs can be set globally on
the worker
         // and through to the task
@@ -332,6 +347,9 @@ class WorkerSinkTask implements WorkerTask {
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            if (rebalanceException != null)
+                return;
+
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
@@ -365,16 +383,30 @@ class WorkerSinkTask implements WorkerTask {
             // Instead of invoking the assignment callback on initialization, we guarantee
the consumer is ready upon
             // task start. Since this callback gets invoked during that initial setup before
we've started the task, we
             // need to guard against invoking the user's callback method during that period.
-            if (started)
-                task.onPartitionsAssigned(partitions);
+            if (started) {
+                try {
+                    task.onPartitionsAssigned(partitions);
+                } catch (RuntimeException e) {
+                    // The consumer swallows exceptions raised in the rebalance listener,
so we need to store
+                    // exceptions and rethrow when poll() returns.
+                    rebalanceException = e;
+                }
+            }
         }
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
             if (started) {
-                task.onPartitionsRevoked(partitions);
-                commitOffsets(true, -1);
+                try {
+                    task.onPartitionsRevoked(partitions);
+                    commitOffsets(true, -1);
+                } catch (RuntimeException e) {
+                    // The consumer swallows exceptions raised in the rebalance listener,
so we need to store
+                    // exceptions and rethrow when poll() returns.
+                    rebalanceException = e;
+                }
             }
+
             // Make sure we don't have any leftover data since offsets will be reset to committed
positions
             messageBatch.clear();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
index b65efa8..93e210a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
@@ -67,11 +67,9 @@ class WorkerSinkTaskThread extends ShutdownableThread {
 
         // Maybe commit
         if (!committing && now >= nextCommit) {
-            synchronized (this) {
-                committing = true;
-                commitSeqno += 1;
-                commitStarted = now;
-            }
+            committing = true;
+            commitSeqno += 1;
+            commitStarted = now;
             task.commitOffsets(false, commitSeqno);
             nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
         }
@@ -91,22 +89,20 @@ class WorkerSinkTaskThread extends ShutdownableThread {
     }
 
     public void onCommitCompleted(Throwable error, long seqno) {
-        synchronized (this) {
-            if (commitSeqno != seqno) {
-                log.debug("Got callback for timed out commit {}: {}, but most recent commit
is {}",
-                        this,
-                        seqno, commitSeqno);
+        if (commitSeqno != seqno) {
+            log.debug("Got callback for timed out commit {}: {}, but most recent commit is
{}",
+                    this,
+                    seqno, commitSeqno);
+        } else {
+            if (error != null) {
+                log.error("Commit of {} offsets threw an unexpected exception: ", task, error);
+                commitFailures++;
             } else {
-                if (error != null) {
-                    log.error("Commit of {} offsets threw an unexpected exception: ", task,
error);
-                    commitFailures++;
-                } else {
-                    log.debug("Finished {} offset commit successfully in {} ms",
-                            task, task.time().milliseconds() - commitStarted);
-                    commitFailures = 0;
-                }
-                committing = false;
+                log.debug("Finished {} offset commit successfully in {} ms",
+                        task, task.time().milliseconds() - commitStarted);
+                commitFailures = 0;
             }
+            committing = false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index d5eaace..305a61e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
@@ -55,6 +56,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WorkerSinkTask.class)
 @PowerMockIgnore("javax.management.*")
@@ -156,6 +160,48 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testErrorInRebalancePartitionRevocation() throws Exception {
+        RuntimeException exception = new RuntimeException("Revocation error");
+
+        expectInitializeTask();
+        expectRebalanceRevocationError(exception);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        try {
+            workerTask.poll(Long.MAX_VALUE);
+            fail("Poll should have raised the rebalance exception");
+        } catch (RuntimeException e) {
+            assertEquals(exception, e);
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testErrorInRebalancePartitionAssignment() throws Exception {
+        RuntimeException exception = new RuntimeException("Assignment error");
+
+        expectInitializeTask();
+        expectRebalanceAssignmentError(exception);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        try {
+            workerTask.poll(Long.MAX_VALUE);
+            fail("Poll should have raised the rebalance exception");
+        } catch (RuntimeException e) {
+            assertEquals(exception, e);
+        }
+
+        PowerMock.verifyAll();
+    }
+
 
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
@@ -183,6 +229,54 @@ public class WorkerSinkTaskTest {
         PowerMock.expectLastCall();
     }
 
+    private void expectRebalanceRevocationError(RuntimeException e) {
+        final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+
+        sinkTask.onPartitionsRevoked(partitions);
+        EasyMock.expectLastCall().andThrow(e);
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
+                        rebalanceListener.getValue().onPartitionsRevoked(partitions);
+                        return ConsumerRecords.empty();
+                    }
+                });
+    }
+
+    private void expectRebalanceAssignmentError(RuntimeException e) {
+        final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+
+        sinkTask.onPartitionsRevoked(partitions);
+        EasyMock.expectLastCall();
+
+        sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall();
+
+        consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall();
+
+        workerThread.onCommitCompleted(EasyMock.<Throwable>isNull(), EasyMock.anyLong());
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+
+        sinkTask.onPartitionsAssigned(partitions);
+        EasyMock.expectLastCall().andThrow(e);
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
+                        rebalanceListener.getValue().onPartitionsRevoked(partitions);
+                        rebalanceListener.getValue().onPartitionsAssigned(partitions);
+                        return ConsumerRecords.empty();
+                    }
+                });
+    }
+
     private void expectConsumerPoll(final int numMessages) {
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {


Mime
View raw message