kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2984: KTable should send old values when required
Date Wed, 16 Dec 2015 23:37:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 841d2d1a2 -> 587a2f4ef


http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 97aca3d..187a6f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -34,6 +34,7 @@ import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
 
@@ -114,4 +115,92 @@ public class KTableSourceTest {
         }
     }
 
+    @Test
+    public void testNotSedingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String,
String>)
+                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer,
topic1);
+
+            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc1", proc1, table1.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null,
null, null);
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)");
+
+            driver.process(topic1, "A", "03");
+
+            proc1.checkAndClearResult("A:(03<-null)");
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testSedingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String,
String>)
+                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer,
topic1);
+
+            table1.enableSendingOldValues();
+
+            assertTrue(table1.sendingOldValueEnabled());
+
+            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc1", proc1, table1.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null,
null, null);
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)");
+
+            driver.process(topic1, "A", "03");
+
+            proc1.checkAndClearResult("A:(03<-02)");
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/587a2f4e/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 510b458..b402525 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -60,10 +60,10 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K,
V> {
     }
 
     public void checkAndClearResult(String... expected) {
-        assertEquals(expected.length, processed.size());
+        assertEquals("the number of outputs:", expected.length, processed.size());
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processed.get(i));
+            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
         }
 
         processed.clear();


Mime
View raw message