kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused
Date Thu, 15 Mar 2018 22:53:07 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7ef719  KAFKA-6661: Ensure sink connectors don’t resume consumer when task is
paused
e7ef719 is described below

commit e7ef719a5bc0d1276f0e9482d59b25406fda276b
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Thu Mar 15 15:52:53 2018 -0700

    KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused
    
    Changed WorkerSinkTaskContext to only resume the consumer topic partitions when the connector/task
is not in the paused state.
    
    The context tracks the set of topic partitions that are explicitly paused/resumed by the
connector, and when the WorkerSinkTask resumes the tasks it currently resumes all topic partitions
*except* those that are still explicitly paused in the context. Therefore, the change above
should result in the desired behavior.
    
    Several debug statements were added to record when the context is called by the connector.
    
    This can be backported to older releases, since this bug goes back to 0.10 or 0.9.
    
    Author: Randall Hauch <rhauch@gmail.com>
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4716 from rhauch/kafka-6661
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  4 +--
 .../connect/runtime/WorkerSinkTaskContext.java     | 32 ++++++++++++++++++++--
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2995a4e..2ba785c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -130,7 +130,7 @@ class WorkerSinkTask extends WorkerTask {
         try {
             this.taskConfig = taskConfig.originalsStrings();
             this.consumer = createConsumer();
-            this.context = new WorkerSinkTaskContext(consumer);
+            this.context = new WorkerSinkTaskContext(consumer, this);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be started.", this, t);
             onFailure(t);
@@ -601,7 +601,7 @@ class WorkerSinkTask extends WorkerTask {
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            log.debug("{} Partitions assigned", WorkerSinkTask.this);
+            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 0878949..386f992 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,26 +31,32 @@ import java.util.Map;
 import java.util.Set;
 
 public class WorkerSinkTaskContext implements SinkTaskContext {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private Map<TopicPartition, Long> offsets;
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
+    private final WorkerSinkTask sinkTask;
     private final Set<TopicPartition> pausedPartitions;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask
sinkTask) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
+        this.sinkTask = sinkTask;
         this.pausedPartitions = new HashSet<>();
     }
 
     @Override
     public void offset(Map<TopicPartition, Long> offsets) {
+        log.debug("{} Setting offsets for topic partitions {}", this, offsets);
         this.offsets.putAll(offsets);
     }
 
     @Override
     public void offset(TopicPartition tp, long offset) {
+        log.debug("{} Setting offset for topic partition {} to {}", this, tp, offset);
         offsets.put(tp, offset);
     }
 
@@ -66,6 +74,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
 
     @Override
     public void timeout(long timeoutMs) {
+        log.debug("{} Setting timeout to {} ms", this, timeoutMs);
         this.timeoutMs = timeoutMs;
     }
 
@@ -92,7 +101,12 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         }
         try {
             Collections.addAll(pausedPartitions, partitions);
-            consumer.pause(Arrays.asList(partitions));
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not pausing consumer's partitions {}",
this, partitions);
+            } else {
+                consumer.pause(Arrays.asList(partitions));
+                log.debug("{} Pausing partitions {}. Connector is not paused.", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not pause partitions that
are not currently assigned to them.", e);
         }
@@ -105,7 +119,12 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         }
         try {
             pausedPartitions.removeAll(Arrays.asList(partitions));
-            consumer.resume(Arrays.asList(partitions));
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not resuming consumer's partitions
{}", this, partitions);
+            } else {
+                consumer.resume(Arrays.asList(partitions));
+                log.debug("{} Resuming partitions: {}", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that
are not currently assigned to them.", e);
         }
@@ -117,6 +136,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
 
     @Override
     public void requestCommit() {
+        log.debug("{} Requesting commit", this);
         commitRequested = true;
     }
 
@@ -128,4 +148,10 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         commitRequested = false;
     }
 
+    @Override
+    public String toString() {
+        return "WorkerSinkTaskContext{" +
+               "id=" + sinkTask.id +
+               '}';
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message