kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3847: Use a separate producer per source task
Date Fri, 12 Aug 2016 21:06:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 39431f734 -> 208ecae23


KAFKA-3847: Use a separate producer per source task

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jason Gustafson, Gwen Shapira

Closes #1727 from ewencp/kafka-3847-per-task-producers and squashes the following commits:

7d39724 [Ewen Cheslack-Postava] Add timeout for closing producers.
98ec7f6 [Ewen Cheslack-Postava] KAFKA-3847: Use a separate producer per source task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/208ecae2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/208ecae2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/208ecae2

Branch: refs/heads/trunk
Commit: 208ecae23994812d2a707b776de6b95d9f591bb7
Parents: 39431f7
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Fri Aug 12 14:06:37 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Aug 12 14:06:37 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/runtime/Worker.java    | 16 +++++++---------
 .../kafka/connect/runtime/WorkerSourceTask.java |  2 +-
 .../connect/runtime/WorkerSourceTaskTest.java   | 20 ++++++++++++++++++++
 3 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/208ecae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e39a7e2..a0664ad 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -71,10 +71,10 @@ public class Worker {
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
     private final OffsetBackingStore offsetBackingStore;
+    private final Map<String, Object> producerProps;
 
     private HashMap<String, WorkerConnector> connectors = new HashMap<>();
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
-    private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
     public Worker(String workerId,
@@ -96,16 +96,11 @@ public class Worker {
 
         this.offsetBackingStore = offsetBackingStore;
         this.offsetBackingStore.configure(config);
-    }
 
-    public void start() {
-        log.info("Worker starting");
-
-        Map<String, Object> producerProps = new HashMap<>();
+        producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG),
","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-
         // These settings are designed to ensure there is no data loss. They *may* be overridden
via configs passed to the
         // worker, but this may compromise the delivery guarantees of Kafka Connect.
         producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
@@ -113,10 +108,12 @@ public class Worker {
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
-
+        // User-specified overrides
         producerProps.putAll(config.originalsWithPrefix("producer."));
+    }
 
-        producer = new KafkaProducer<>(producerProps);
+    public void start() {
+        log.info("Worker starting");
 
         offsetBackingStore.start();
         sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
@@ -349,6 +346,7 @@ public class Worker {
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore,
id.connector(),
                     internalKeyConverter, internalValueConverter);
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState,
keyConverter,
                      valueConverter, producer, offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/208ecae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6d91b36..260015e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -115,7 +115,7 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     protected void close() {
-        // nothing to do
+        producer.close(30, TimeUnit.SECONDS);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/208ecae2/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 9854f22..30c7118 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -156,6 +156,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -195,6 +198,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -238,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -280,6 +289,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -317,6 +329,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
 
         PowerMock.replayAll();
 
@@ -356,6 +370,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -499,6 +516,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);


Mime
View raw message