kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3260 - Added SourceTask.commitRecord
Date Tue, 15 Mar 2016 21:32:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fd6efbe0b -> 6eacc0de3


KAFKA-3260 - Added SourceTask.commitRecord

Added commitRecord(SourceRecord record) to SourceTask. This method is called during the callback
from producer.send() when the message has been sent successfully. Added commitTaskRecord(SourceRecord
record) to WorkerSourceTask to handle calling commitRecord on the SourceTask. Updated tests
for calls to commitRecord.

Author: Jeremy Custenborder <jcustenborder@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #950 from jcustenborder/KAFKA-3260


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6eacc0de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6eacc0de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6eacc0de

Branch: refs/heads/trunk
Commit: 6eacc0de303e4d29e083b89c1f53615c1dfa291e
Parents: fd6efbe
Author: Jeremy Custenborder <jcustenborder@gmail.com>
Authored: Tue Mar 15 14:32:22 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Mar 15 14:32:22 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/source/SourceTask.java    | 16 ++++++++++++++++
 .../kafka/connect/runtime/WorkerSourceTask.java    |  9 +++++++++
 .../connect/runtime/WorkerSourceTaskTest.java      | 17 +++++++++++++++++
 3 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6eacc0de/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 802fcdd..c508ec1 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -79,4 +79,20 @@ public abstract class SourceTask implements Task {
      * {@link java.nio.channels.Selector#wakeup() wakeup()} to interrupt any ongoing requests.
      */
     public abstract void stop();
+
+    /**
+     * <p>
+     * Commit an individual {@link SourceRecord} when the callback from the producer client
is received.
+     * </p>
+     * <p>
+     * SourceTasks are not required to implement this functionality; Kafka Connect will record
offsets
+     * automatically. This hook is provided for systems that also need to store offsets internally
+     * in their own system.
+     * </p>
+     * @param record {@link SourceRecord} that was successfully sent via the producer.
+     * @throws InterruptedException
+     */
+    public void commitRecord(SourceRecord record) throws InterruptedException {
+        // This space intentionally left blank.
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6eacc0de/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0014be8..3a43f96 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -207,6 +207,7 @@ class WorkerSourceTask extends WorkerTask {
                                     log.trace("Wrote record successfully: topic {} partition
{} offset {}",
                                             recordMetadata.topic(), recordMetadata.partition(),
                                             recordMetadata.offset());
+                                    commitTaskRecord(record);
                                 }
                                 recordSent(producerRecord);
                             }
@@ -226,6 +227,14 @@ class WorkerSourceTask extends WorkerTask {
         return true;
     }
 
+    private void commitTaskRecord(SourceRecord record) {
+        try {
+            task.commitRecord(record);
+        } catch (InterruptedException e) {
+            log.error("Exception thrown", e);
+        }
+    }
+
     private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record)
{
         ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
         // While flushing, we may also see callbacks for items in the backlog

http://git-wip-us.apache.org/repos/asf/kafka/blob/6eacc0de/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 9b0133a..ece2985 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -133,6 +133,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the offset writer
 
+        expectCommitRecord(10);
+
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
@@ -203,10 +205,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
+        
+        expectCommitRecord(1);
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -233,6 +238,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
+        expectCommitRecord(1);
         expectOffsetFlush(true);
 
         sourceTask.stop();
@@ -254,6 +260,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    private void expectCommitRecord(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+            EasyMock.expectLastCall();
+        }
+    }
+
     @Test
     public void testSendRecordsConvertsData() throws Exception {
         createWorkerTask();
@@ -264,6 +277,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
 
+        expectCommitRecord(records.size());
+
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", records);
@@ -292,6 +307,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         expectSendRecordOnce(true);
         expectSendRecordOnce(false);
 
+        expectCommitRecord(3);
+
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two


Mime
View raw message