kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-8065: restore original input record timestamp in forward() (#6393)
Date Sat, 09 Mar 2019 03:57:03 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new c509d25  KAFKA-8065: restore original input record timestamp in forward() (#6393)
c509d25 is described below

commit c509d253d75b683713eb99032d9e32c7510e4df7
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Fri Mar 8 19:24:26 2019 -0800

    KAFKA-8065: restore original input record timestamp in forward() (#6393)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>,
Guozhang Wang <guozhang@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  | 12 +++--
 .../processor/internals/ProcessorTopologyTest.java | 55 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f1ee81f..6300553 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -105,12 +105,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
     @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        toInternal.update(to);
-        if (toInternal.hasTimestamp()) {
-            recordContext.setTimestamp(toInternal.timestamp());
-        }
         final ProcessorNode previousNode = currentNode();
+        final long currentTimestamp = recordContext.timestamp;
+
         try {
+            toInternal.update(to);
+            if (toInternal.hasTimestamp()) {
+                recordContext.setTimestamp(toInternal.timestamp());
+            }
+
             final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K,
V>>) currentNode().children();
             final String sendTo = toInternal.child();
             if (sendTo != null) {
@@ -131,6 +134,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
                 }
             }
         } finally {
+            recordContext.timestamp = currentTimestamp;
             setCurrentNode(previousNode);
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index d88d3b5..53a40c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -347,6 +347,32 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
+        final int partition = 10;
+        driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(partition),
props);
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 15L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 12L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 22L);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 25L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 22L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 32L);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+    }
+
+    @Test
     public void shouldConsiderHeaders() {
         final int partition = 10;
         driver = new TopologyTestDriver(createSimpleTopology(partition), props);
@@ -440,6 +466,16 @@ public class ProcessorTopologyTest {
             .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
+    private Topology createMultiProcessorTimestampTopology(final int partition) {
+        return topology
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new FanOutTimestampProcessor("child1", "child2")),
"source")
+            .addProcessor("child1", define(new ForwardingProcessor()), "processor")
+            .addProcessor("child2", define(new TimestampProcessor()), "processor")
+            .addSink("sink1", OUTPUT_TOPIC_1, constantPartitioner(partition), "child1")
+            .addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), "child2");
+    }
+
     private Topology createMultiplexingTopology() {
         return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@@ -533,6 +569,25 @@ public class ProcessorTopologyTest {
         }
     }
 
+    protected static class FanOutTimestampProcessor extends AbstractProcessor<String,
String> {
+        private final String firstChild;
+        private final String secondChild;
+
+        FanOutTimestampProcessor(final String firstChild,
+                                 final String secondChild) {
+            this.firstChild = firstChild;
+            this.secondChild = secondChild;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            context().forward(key, value);
+            context().forward(key, value, To.child(firstChild).withTimestamp(context().timestamp()
+ 5));
+            context().forward(key, value, To.child(secondChild));
+            context().forward(key, value, To.all().withTimestamp(context().timestamp() +
2));
+        }
+    }
+
     protected static class AddHeaderProcessor extends AbstractProcessor<String, String>
{
         @Override
         public void process(final String key, final String value) {


Mime
View raw message