kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: resolve conflicts from cherry-picking 3dafb81da788294d4c2e9811f49437608e5b9ce8
Date Fri, 05 Aug 2016 17:36:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 ac994dd76 -> 01b6bd227


resolve conflicts from cherry-picking 3dafb81da788294d4c2e9811f49437608e5b9ce8


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01b6bd22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01b6bd22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01b6bd22

Branch: refs/heads/0.10.0
Commit: 01b6bd227afeaf208c2115857ba2de5ad6fac1cd
Parents: ac994dd
Author: Jan Filipiak <Jan.Filipiak@trivago.com>
Authored: Fri Aug 5 10:03:22 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Aug 5 10:35:43 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/KTableRepartitionMap.java |  9 +++--
 .../internals/KGroupedTableImplTest.java        |  2 +-
 .../kstream/internals/KTableAggregateTest.java  | 38 +++++++++++---------
 3 files changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01b6bd22/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ac7c00e..939a1df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -81,12 +81,15 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
             KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key,
change.oldValue);
 
             // if the selected repartition key or value is null, skip
-            if (newPair != null && newPair.key != null && newPair.value !=
null) {
-                context().forward(newPair.key, new Change<>(newPair.value, null));
-            }
+            // forward oldPair first, to be consistent with reduce and aggregate
             if (oldPair != null && oldPair.key != null && oldPair.value !=
null) {
                 context().forward(oldPair.key, new Change<>(null, oldPair.value));
             }
+
+            if (newPair != null && newPair.key != null && newPair.value !=
null) {
+                context().forward(newPair.key, new Change<>(newPair.value, null));
+            }
+            
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01b6bd22/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index fc0451a..c47ae3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -72,7 +72,7 @@ public class KGroupedTableImplTest {
         driver.process(input, "C", "yellow");
         driver.process(input, "D", "green");
 
-        final List<String> expected = Arrays.asList("green:1", "green:2", "blue:1",
"green:1", "yellow:1", "green:2");
+        final List<String> expected = Arrays.asList("green:1", "green:2", "green:1",
"blue:1", "yellow:1", "green:2");
         final List<String> actual = processorSupplier.processed;
         assertEquals(expected, actual);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01b6bd22/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 75e007d..7928c38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -34,6 +34,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 import java.io.File;
 import java.io.IOException;
 
@@ -74,8 +75,8 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+        table2.toStream().process(proc);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
@@ -91,12 +92,12 @@ public class KTableAggregateTest {
         assertEquals(Utils.mkList(
                 "A:0+1",
                 "B:0+2",
-                "A:0+1+3", "A:0+1+3-1",
-                "B:0+2+4", "B:0+2+4-2",
+                "A:0+1-1", "A:0+1-1+3",
+                "B:0+2-2", "B:0+2-2+4",
                 "C:0+5",
                 "D:0+6",
-                "B:0+2+4-2+7", "B:0+2+4-2+7-4",
-                "C:0+5+8", "C:0+5+8-5"), proc2.processed);
+                "B:0+2-2+4-4", "B:0+2-2+4-4+7",
+                "C:0+5-5", "C:0+5-5+8"), proc.processed);
     }
 
     @Test
@@ -109,11 +110,11 @@ public class KTableAggregateTest {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
                     if (key.equals("null")) {
-                        return KeyValue.pair(null, value + "s");
+                        return KeyValue.pair(null, value);
                     } else if (key.equals("NULL")) {
                         return null;
                     } else {
-                        return KeyValue.pair(value, value + "s");
+                        return KeyValue.pair(value, value);
                     }
                 }
             },
@@ -126,12 +127,14 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+        table2.toStream().process(proc);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "1");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "A", "1");
         driver.process(topic1, "B", "2");
         driver.process(topic1, "null", "3");
         driver.process(topic1, "B", "4");
@@ -139,11 +142,14 @@ public class KTableAggregateTest {
         driver.process(topic1, "B", "7");
 
         assertEquals(Utils.mkList(
-                "1:0+1s",
-                "2:0+2s",
-                "4:0+4s",
-                "2:0+2s-2s",
-                "7:0+7s",
-                "4:0+4s-4s"), proc2.processed);
+                "1:0+1",
+                "1:0+1-1",
+                "1:0+1-1+1",
+                "2:0+2",
+                // noop
+                "2:0+2-2", "4:0+4",
+                // noop
+                "4:0+4-4", "7:0+7"
+                ), proc.processed);
     }
 }


Mime
View raw message