kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Wakeups propagated from commitOffsets in WorkerSinkTask should be caught
Date Mon, 26 Sep 2016 21:54:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 4104f014e -> d6b3ff142


MINOR: Wakeups propagated from commitOffsets in WorkerSinkTask should be caught

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1907 from hachikuji/catch-wakeup-worker-sink-task

(cherry picked from commit b75245cfbbefc712103b9329da0f27a205baa6aa)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6b3ff14
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6b3ff14
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6b3ff14

Branch: refs/heads/0.10.1
Commit: d6b3ff142a8ab19264b7af76bda7ddaa8c4bea1d
Parents: 4104f01
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Sep 26 14:54:01 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Sep 26 14:54:32 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSinkTask.java   | 84 ++++++++++----------
 .../connect/runtime/WorkerSinkTaskTest.java     | 36 ++++-----
 2 files changed, 60 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b3ff14/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 624b032..4181799 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -149,26 +149,39 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     protected void iteration() {
-        long now = time.milliseconds();
+        try {
+            long now = time.milliseconds();
 
-        // Maybe commit
-        if (!committing && now >= nextCommit) {
-            commitOffsets(now, false);
-            nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-        }
+            // Maybe commit
+            if (!committing && now >= nextCommit) {
+                commitOffsets(now, false);
+                nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+            }
 
-        // Check for timed out commits
-        long commitTimeout = commitStarted + workerConfig.getLong(
-                WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
-        if (committing && now >= commitTimeout) {
-            log.warn("Commit of {} offsets timed out", this);
-            commitFailures++;
-            committing = false;
-        }
+            // Check for timed out commits
+            long commitTimeout = commitStarted + workerConfig.getLong(
+                    WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+            if (committing && now >= commitTimeout) {
+                log.warn("Commit of {} offsets timed out", this);
+                commitFailures++;
+                committing = false;
+            }
 
-        // And process messages
-        long timeoutMs = Math.max(nextCommit - now, 0);
-        poll(timeoutMs);
+            // And process messages
+            long timeoutMs = Math.max(nextCommit - now, 0);
+            poll(timeoutMs);
+        } catch (WakeupException we) {
+            log.trace("{} consumer woken up", id);
+
+            if (isStopping())
+                return;
+
+            if (shouldPause()) {
+                pauseAll();
+            } else if (!pausedForRedelivery) {
+                resumeAll();
+            }
+        }
     }
 
     private void onCommitCompleted(Throwable error, long seqno) {
@@ -211,33 +224,20 @@ class WorkerSinkTask extends WorkerTask {
 
     /** Poll for new messages with the given timeout. Should only be invoked by the worker
thread. */
     protected void poll(long timeoutMs) {
-        try {
-            rewind();
-            long retryTimeout = context.timeout();
-            if (retryTimeout > 0) {
-                timeoutMs = Math.min(timeoutMs, retryTimeout);
-                context.timeout(-1L);
-            }
-
-            log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
-            ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
-            assert messageBatch.isEmpty() || msgs.isEmpty();
-            log.trace("{} polling returned {} messages", id, msgs.count());
+        rewind();
+        long retryTimeout = context.timeout();
+        if (retryTimeout > 0) {
+            timeoutMs = Math.min(timeoutMs, retryTimeout);
+            context.timeout(-1L);
+        }
 
-            convertMessages(msgs);
-            deliverMessages();
-        } catch (WakeupException we) {
-            log.trace("{} consumer woken up", id);
+        log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
+        ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
+        assert messageBatch.isEmpty() || msgs.isEmpty();
+        log.trace("{} polling returned {} messages", id, msgs.count());
 
-            if (isStopping())
-                return;
-
-            if (shouldPause()) {
-                pauseAll();
-            } else if (!pausedForRedelivery) {
-                resumeAll();
-            }
-        }
+        convertMessages(msgs);
+        deliverMessages();
     }
 
     private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b3ff14/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index dbb3f8d..ca218c3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -148,7 +148,7 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
 
         PowerMock.verifyAll();
     }
@@ -197,14 +197,14 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE); // initial assignment
-        workerTask.poll(Long.MAX_VALUE); // fetch some data
+        workerTask.iteration(); // initial assignment
+        workerTask.iteration(); // fetch some data
         workerTask.transitionTo(TargetState.PAUSED);
-        workerTask.poll(Long.MAX_VALUE); // wakeup
-        workerTask.poll(Long.MAX_VALUE); // now paused
+        workerTask.iteration(); // wakeup
+        workerTask.iteration(); // now paused
         workerTask.transitionTo(TargetState.STARTED);
-        workerTask.poll(Long.MAX_VALUE); // wakeup
-        workerTask.poll(Long.MAX_VALUE); // now unpaused
+        workerTask.iteration(); // wakeup
+        workerTask.iteration(); // now unpaused
 
         PowerMock.verifyAll();
     }
@@ -241,9 +241,9 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE);
-        workerTask.poll(Long.MAX_VALUE);
-        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
+        workerTask.iteration();
+        workerTask.iteration();
 
         PowerMock.verifyAll();
     }
@@ -260,9 +260,9 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
         try {
-            workerTask.poll(Long.MAX_VALUE);
+            workerTask.iteration();
             fail("Poll should have raised the rebalance exception");
         } catch (RuntimeException e) {
             assertEquals(exception, e);
@@ -283,9 +283,9 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
         try {
-            workerTask.poll(Long.MAX_VALUE);
+            workerTask.iteration();
             fail("Poll should have raised the rebalance exception");
         } catch (RuntimeException e) {
             assertEquals(exception, e);
@@ -343,8 +343,8 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE); // poll for initial assignment
-        workerTask.poll(Long.MAX_VALUE); // now rebalance with the wakeup triggered
+        workerTask.iteration(); // poll for initial assignment
+        workerTask.iteration(); // now rebalance with the wakeup triggered
 
         PowerMock.verifyAll();
     }
@@ -363,7 +363,7 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
 
         SinkRecord record = records.getValue().iterator().next();
 
@@ -391,7 +391,7 @@ public class WorkerSinkTaskTest {
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
-        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
 
         SinkRecord record = records.getValue().iterator().next();
 


Mime
View raw message