kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5567: Connect sink worker should commit offsets of original topic partitions
Date Wed, 16 Aug 2017 21:43:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 eda1bf6c1 -> bdeb98c86


KAFKA-5567: Connect sink worker should commit offsets of original topic partitions

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3499 from kkonstantine/KAFKA-5567-With-transformations-that-mutate-the-topic-partition-committing-offsets-should-to-refer-to-the-original-topic-partition

(cherry picked from commit 72eacbea5b831096a6cc9a4fa42401bc74a88a80)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdeb98c8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdeb98c8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdeb98c8

Branch: refs/heads/0.11.0
Commit: bdeb98c866628a8a0787c3061b6473d9b6e5bbbe
Parents: eda1bf6
Author: Konstantine Karantasis <konstantine@confluent.io>
Authored: Wed Aug 16 14:43:29 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Aug 16 14:43:43 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSinkTask.java   | 19 ++++--
 .../connect/runtime/WorkerSinkTaskTest.java     | 68 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdeb98c8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 9d9f240..307dfd5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -67,6 +67,7 @@ class WorkerSinkTask extends WorkerTask {
     private final List<SinkRecord> messageBatch;
     private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
     private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
+    private final Map<TopicPartition, OffsetAndMetadata> origOffsets;
     private RuntimeException rebalanceException;
     private long nextCommit;
     private int commitSeqno;
@@ -95,6 +96,7 @@ class WorkerSinkTask extends WorkerTask {
         this.time = time;
         this.messageBatch = new ArrayList<>();
         this.currentOffsets = new HashMap<>();
+        this.origOffsets = new HashMap<>();
         this.pausedForRedelivery = false;
         this.rebalanceException = null;
         this.nextCommit = time.milliseconds() +
@@ -415,13 +417,14 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
+        origOffsets.clear();
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
             log.trace("{} Consuming and converting message in topic '{}' partition {} at
offset {} and timestamp {}",
                     this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
             SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
             SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
             Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
-            SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
+            SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
                     keyAndSchema.schema(), keyAndSchema.value(),
                     valueAndSchema.schema(), valueAndSchema.value(),
                     msg.offset(),
@@ -429,9 +432,13 @@ class WorkerSinkTask extends WorkerTask {
                     msg.timestampType());
             log.trace("{} Applying transformations to record in topic '{}' partition {} at
offset {} and timestamp {} with key {} and value {}",
                     this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(),
valueAndSchema.value());
-            record = transformationChain.apply(record);
-            if (record != null) {
-                messageBatch.add(record);
+            SinkRecord transRecord = transformationChain.apply(origRecord);
+            origOffsets.put(
+                    new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()),
+                    new OffsetAndMetadata(origRecord.kafkaOffset() + 1)
+            );
+            if (transRecord != null) {
+                messageBatch.add(transRecord);
             } else {
                 log.trace("{} Transformations returned null, so dropping record in topic
'{}' partition {} at offset {} and timestamp {} with key {} and value {}",
                         this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(),
valueAndSchema.value());
@@ -455,9 +462,7 @@ class WorkerSinkTask extends WorkerTask {
             // Since we reuse the messageBatch buffer, ensure we give the task its own copy
             log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
             task.put(new ArrayList<>(messageBatch));
-            for (SinkRecord record : messageBatch)
-                currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
-                        new OffsetAndMetadata(record.kafkaOffset() + 1));
+            currentOffsets.putAll(origOffsets);
             messageBatch.clear();
             // If we had paused all consumer topic partitions to try to redeliver data, then
we should resume any that
             // the task had not explicitly paused

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdeb98c8/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index c0a7e6c..fac1b79 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -824,6 +824,57 @@ public class WorkerSinkTaskTest {
     }
 
     @Test
+    public void testDeliveryWithMutatingTransform() throws Exception {
+        expectInitializeTask();
+
+        expectPollInitialAssignment();
+
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1, "newtopic_");
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.preCommit(offsets);
+        EasyMock.expectLastCall().andReturn(offsets);
+
+        final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
+        consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                callback.getValue().onComplete(offsets, null);
+                return null;
+            }
+        });
+
+        expectConsumerPoll(0);
+        sinkTask.put(Collections.<SinkRecord>emptyList());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+
+        workerTask.iteration(); // initial assignment
+
+        workerTask.iteration(); // first record delivered
+
+        sinkTaskContext.getValue().requestCommit();
+        assertTrue(sinkTaskContext.getValue().isCommitRequested());
+        assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask,
"lastCommittedOffsets"));
+        workerTask.iteration(); // triggers the commit
+        assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been
cleared
+        assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask,
"lastCommittedOffsets"));
+        assertEquals(0, workerTask.commitFailures());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();
         expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
@@ -980,6 +1031,10 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectConversionAndTransformation(final int numMessages) {
+        expectConversionAndTransformation(numMessages, null);
+    }
+
+    private void expectConversionAndTransformation(final int numMessages, final String topicPrefix)
{
         EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA,
KEY)).times(numMessages);
         EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA,
VALUE)).times(numMessages);
 
@@ -988,7 +1043,18 @@ public class WorkerSinkTaskTest {
                 .andAnswer(new IAnswer<SinkRecord>() {
                     @Override
                     public SinkRecord answer() {
-                        return recordCapture.getValue();
+                        SinkRecord origRecord = recordCapture.getValue();
+                        return topicPrefix != null && !topicPrefix.isEmpty()
+                               ? origRecord.newRecord(
+                                       topicPrefix + origRecord.topic(),
+                                       origRecord.kafkaPartition(),
+                                       origRecord.keySchema(),
+                                       origRecord.key(),
+                                       origRecord.valueSchema(),
+                                       origRecord.value(),
+                                       origRecord.timestamp()
+                               )
+                               : origRecord;
                     }
                 }).times(numMessages);
     }


Mime
View raw message