kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3] (#5052)
Date Tue, 22 May 2018 15:49:11 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6281fbc  KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3]
(#5052)
6281fbc is described below

commit 6281fbcb6ad3c1f7172af0ba590fa1e2605093de
Author: Filipe Agapito <filipe.agapito@gmail.com>
AuthorDate: Tue May 22 16:48:54 2018 +0100

    KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3] (#5052)
    
    * KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3]
    
    * Refactor:
      - KStreamWindowReduceTest
      - KTableMapKeysTest
      - SessionWindowedKStreamImplTest
      - TimeWindowedKStreamImplTest
    
    * Remove unnecessary @SuppressWarnings(unchecked)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../kstream/internals/KStreamWindowReduceTest.java | 29 ++++---
 .../kstream/internals/KTableMapKeysTest.java       | 38 ++++-----
 .../internals/SessionWindowedKStreamImplTest.java  | 91 +++++++++++----------
 .../internals/TimeWindowedKStreamImplTest.java     | 95 ++++++++++++----------
 4 files changed, 130 insertions(+), 123 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index aa23971..4ae2f76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -17,25 +17,32 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowReduceTest {
+
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
+
     @Test
     public void shouldLogAndMeterOnNullKey() {
-        final KStreamTestDriver driver = new KStreamTestDriver();
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder
@@ -49,14 +56,14 @@ public class KStreamWindowReduceTest {
                 }
             });
 
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
 
-        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.process("TOPIC", null, "asdf");
-        driver.flushState();
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props))
{
+            final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+            driver.pipeInput(recordFactory.create("TOPIC", null, "asdf"));
+            LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total",
"stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf]
topic=[TOPIC] partition=[-1] offset=[-1]"));
+            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total",
"stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key.
value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 14552d6..081c6a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -17,40 +17,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
 public class KTableMapKeysTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
-    final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
-    private File stateDir = null;
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-
-    
-    @Before
-     public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new
IntegerSerializer(), new StringSerializer());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(),
Serdes.String());
 
     @Test
     public void testMapKeysConvertingToStream() {
@@ -58,7 +48,7 @@ public class KTableMapKeysTest {
 
         String topic1 = "topic_map_keys";
 
-        KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde,
stringSerde));
+        KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(Serdes.Integer(),
Serdes.String()));
 
         final Map<Integer, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");
@@ -82,11 +72,11 @@ public class KTableMapKeysTest {
 
         convertedStream.process(supplier);
 
-        driver.setUp(builder, stateDir);
-        for (int i = 0;  i < originalKeys.length; i++) {
-            driver.process(topic1, originalKeys[i], values[i]);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props))
{
+            for (int i = 0; i < originalKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, originalKeys[i], values[i]));
+            }
         }
-        driver.flushState();
 
         assertEquals(3, supplier.theCapturedProcessor().processed.size());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 08fa65c..825edb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -18,10 +18,12 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -31,20 +33,19 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -53,9 +54,9 @@ public class SessionWindowedKStreamImplTest {
 
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
     private final Merger<String, String> sessionMerger = new Merger<String, String>()
{
         @Override
         public String apply(final String aggKey, final String aggOne, final String aggTwo)
{
@@ -83,7 +84,9 @@ public class SessionWindowedKStreamImplTest {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
         assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
         assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
@@ -101,7 +104,9 @@ public class SessionWindowedKStreamImplTest {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
         assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
         assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
@@ -121,42 +126,45 @@ public class SessionWindowedKStreamImplTest {
                         results.put(key, value);
                     }
                 });
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
         assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
         assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeCount() {
         stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store"));
 
-        processData();
-        final SessionStore<String, Long> store = (SessionStore<String, Long>)
driver.allStateStores().get("count-store");
-        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1",
"2"));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
-                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+            final SessionStore<String, Long> store = driver.getSessionStore("count-store");
+            final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1",
"2"));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)),
1L),
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)),
1L))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeReduced() {
         stream.reduce(MockReducer.STRING_ADDER, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("reduced"));
 
-        processData();
-        final SessionStore<String, String> sessionStore = (SessionStore<String,
String>) driver.allStateStores().get("reduced");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+            final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced");
+            final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
 
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
-                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)),
"3"),
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)),
"1"))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeAggregated() {
         stream.aggregate(MockInitializer.STRING_INIT,
@@ -164,13 +172,15 @@ public class SessionWindowedKStreamImplTest {
                          sessionMerger,
                          Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated").withValueSerde(Serdes.String()));
 
-        processData();
-        final SessionStore<String, String> sessionStore = (SessionStore<String,
String>) driver.allStateStores().get("aggregated");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
-                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+            final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated");
+            final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)),
"0+3"),
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)),
"0+1"))));
+        }
     }
 
     @Test(expected = NullPointerException.class)
@@ -243,16 +253,11 @@ public class SessionWindowedKStreamImplTest {
         stream.count(null);
     }
 
-    private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "1", "2");
-        driver.setTime(600);
-        driver.process(TOPIC, "1", "3");
-        driver.process(TOPIC, "2", "1");
-        driver.flushState();
+    private void processData(final TopologyTestDriver driver) {
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600));
     }
 
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 610e52f..7b885b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -18,32 +18,33 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -52,9 +53,8 @@ public class TimeWindowedKStreamImplTest {
 
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
-
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
     private TimeWindowedKStream<String, String> windowedStream;
 
     @Before
@@ -76,7 +76,9 @@ public class TimeWindowedKStreamImplTest {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
         assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
@@ -95,7 +97,9 @@ public class TimeWindowedKStreamImplTest {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
         assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
@@ -115,29 +119,32 @@ public class TimeWindowedKStreamImplTest {
                         results.put(key, value);
                     }
                 });
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
         assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeCount() {
         windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
                                      .withKeySerde(Serdes.String())
                                      .withValueSerde(Serdes.Long()));
 
-        processData();
-        final WindowStore<String, Long> windowStore = (WindowStore<String, Long>)
driver.allStateStores().get("count-store");
-        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+            final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
+            final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeReduced() {
         windowedStream.reduce(MockReducer.STRING_ADDER,
@@ -145,17 +152,18 @@ public class TimeWindowedKStreamImplTest {
                                       .withKeySerde(Serdes.String())
                                       .withValueSerde(Serdes.String()));
 
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, String>)
driver.allStateStores().get("reduced");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+            final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
+            final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
 
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeAggregated() {
         windowedStream.aggregate(MockInitializer.STRING_INIT,
@@ -164,13 +172,15 @@ public class TimeWindowedKStreamImplTest {
                                          .withKeySerde(Serdes.String())
                                          .withValueSerde(Serdes.String()));
 
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, String>)
driver.allStateStores().get("aggregated");
-        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+            processData(driver);
+            final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+            final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+        }
     }
 
     @Test(expected = NullPointerException.class)
@@ -227,16 +237,11 @@ public class TimeWindowedKStreamImplTest {
         windowedStream.count(null);
     }
 
-    private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "1", "2");
-        driver.setTime(500);
-        driver.process(TOPIC, "1", "3");
-        driver.process(TOPIC, "2", "1");
-        driver.flushState();
+    private void processData(final TopologyTestDriver driver) {
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L));
     }
 
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message