kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3817: KTableRepartitionMap publish old Change first, for non-count aggregates [Forced Update!]
Date Fri, 05 Aug 2016 23:26:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 465374fea -> 22f82abb5 (forced update)


KAFKA-3817: KTableRepartitionMap publish old Change first, for non-count aggregates

I affirm that the contribution is my original work and that I license the work to the project
under the project's open source license.

This cleans up misbehaviour that was introduce while fixing KAFKA-3817. It is impossible for
a non-count aggregate to be build, when the addition happens before the removal. IMHO making
sure that these details are correct is very important.

This PR has local test errors. It somehow fails the ResetIntegrationTest. It doesn't quite
appear to me why but it looks like this PR breaks it, especially because the error appears
with the ordering of the events. Still I am unable to find where I could have broken it. Maybe
not seems to fail on trunk aswell.

Author: jfilipiak <Jan.Filipiak@trivago.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1705 from Kaiserchen/KAFKA-3817-preserve-order-for-aggreagators

(cherry picked from 3dafb81da788294d4c2e9811f49437608e5b9ce8)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22f82abb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22f82abb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22f82abb

Branch: refs/heads/0.10.0
Commit: 22f82abb5ac342ed94cacbe375495509f395b5cd
Parents: ac994dd
Author: Jan Filipiak <Jan.Filipiak@trivago.com>
Authored: Fri Aug 5 10:03:22 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Aug 6 00:26:17 2016 +0100

----------------------------------------------------------------------
 .../kstream/internals/KTableRepartitionMap.java |  9 +++--
 .../internals/KGroupedTableImplTest.java        |  2 +-
 .../kstream/internals/KTableAggregateTest.java  | 38 +++++++++++---------
 3 files changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/22f82abb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ac7c00e..939a1df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -81,12 +81,15 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
             KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key,
change.oldValue);
 
             // if the selected repartition key or value is null, skip
-            if (newPair != null && newPair.key != null && newPair.value !=
null) {
-                context().forward(newPair.key, new Change<>(newPair.value, null));
-            }
+            // forward oldPair first, to be consistent with reduce and aggregate
             if (oldPair != null && oldPair.key != null && oldPair.value !=
null) {
                 context().forward(oldPair.key, new Change<>(null, oldPair.value));
             }
+
+            if (newPair != null && newPair.key != null && newPair.value !=
null) {
+                context().forward(newPair.key, new Change<>(newPair.value, null));
+            }
+            
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/22f82abb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index fc0451a..c47ae3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -72,7 +72,7 @@ public class KGroupedTableImplTest {
         driver.process(input, "C", "yellow");
         driver.process(input, "D", "green");
 
-        final List<String> expected = Arrays.asList("green:1", "green:2", "blue:1",
"green:1", "yellow:1", "green:2");
+        final List<String> expected = Arrays.asList("green:1", "green:2", "green:1",
"blue:1", "yellow:1", "green:2");
         final List<String> actual = processorSupplier.processed;
         assertEquals(expected, actual);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/22f82abb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 75e007d..7928c38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -34,6 +34,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 import java.io.File;
 import java.io.IOException;
 
@@ -74,8 +75,8 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+        table2.toStream().process(proc);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
@@ -91,12 +92,12 @@ public class KTableAggregateTest {
         assertEquals(Utils.mkList(
                 "A:0+1",
                 "B:0+2",
-                "A:0+1+3", "A:0+1+3-1",
-                "B:0+2+4", "B:0+2+4-2",
+                "A:0+1-1", "A:0+1-1+3",
+                "B:0+2-2", "B:0+2-2+4",
                 "C:0+5",
                 "D:0+6",
-                "B:0+2+4-2+7", "B:0+2+4-2+7-4",
-                "C:0+5+8", "C:0+5+8-5"), proc2.processed);
+                "B:0+2-2+4-4", "B:0+2-2+4-4+7",
+                "C:0+5-5", "C:0+5-5+8"), proc.processed);
     }
 
     @Test
@@ -109,11 +110,11 @@ public class KTableAggregateTest {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
                     if (key.equals("null")) {
-                        return KeyValue.pair(null, value + "s");
+                        return KeyValue.pair(null, value);
                     } else if (key.equals("NULL")) {
                         return null;
                     } else {
-                        return KeyValue.pair(value, value + "s");
+                        return KeyValue.pair(value, value);
                     }
                 }
             },
@@ -126,12 +127,14 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+        table2.toStream().process(proc);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "1");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "A", "1");
         driver.process(topic1, "B", "2");
         driver.process(topic1, "null", "3");
         driver.process(topic1, "B", "4");
@@ -139,11 +142,14 @@ public class KTableAggregateTest {
         driver.process(topic1, "B", "7");
 
         assertEquals(Utils.mkList(
-                "1:0+1s",
-                "2:0+2s",
-                "4:0+4s",
-                "2:0+2s-2s",
-                "7:0+7s",
-                "4:0+4s-4s"), proc2.processed);
+                "1:0+1",
+                "1:0+1-1",
+                "1:0+1-1+1",
+                "2:0+2",
+                // noop
+                "2:0+2-2", "4:0+4",
+                // noop
+                "4:0+4-4", "7:0+7"
+                ), proc.processed);
     }
 }


Mime
View raw message