This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6281fbc KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3]
(#5052)
6281fbc is described below
commit 6281fbcb6ad3c1f7172af0ba590fa1e2605093de
Author: Filipe Agapito <filipe.agapito@gmail.com>
AuthorDate: Tue May 22 16:48:54 2018 +0100
KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3] (#5052)
* KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3]
* Refactor:
- KStreamWindowReduceTest
- KTableMapKeysTest
- SessionWindowedKStreamImplTest
- TimeWindowedKStreamImplTest
* Remove unnecessary @SuppressWarnings(unchecked)
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
.../kstream/internals/KStreamWindowReduceTest.java | 29 ++++---
.../kstream/internals/KTableMapKeysTest.java | 38 ++++-----
.../internals/SessionWindowedKStreamImplTest.java | 91 +++++++++++----------
.../internals/TimeWindowedKStreamImplTest.java | 95 ++++++++++++----------
4 files changed, 130 insertions(+), 123 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index aa23971..4ae2f76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -17,25 +17,32 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
+import java.util.Properties;
+
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamWindowReduceTest {
+
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
+
@Test
public void shouldLogAndMeterOnNullKey() {
- final KStreamTestDriver driver = new KStreamTestDriver();
final StreamsBuilder builder = new StreamsBuilder();
builder
@@ -49,14 +56,14 @@ public class KStreamWindowReduceTest {
}
});
- driver.setUp(builder, TestUtils.tempDirectory(), 0);
- final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.process("TOPIC", null, "asdf");
- driver.flushState();
- LogCaptureAppender.unregister(appender);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props))
{
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.pipeInput(recordFactory.create("TOPIC", null, "asdf"));
+ LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total",
"stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf]
topic=[TOPIC] partition=[-1] offset=[-1]"));
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total",
"stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key.
value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 14552d6..081c6a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -17,40 +17,30 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class KTableMapKeysTest {
- final private Serde<String> stringSerde = new Serdes.StringSerde();
- final private Serde<Integer> integerSerde = new Serdes.IntegerSerde();
- private File stateDir = null;
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
-
-
- @Before
- public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
- }
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new
IntegerSerializer(), new StringSerializer());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(),
Serdes.String());
@Test
public void testMapKeysConvertingToStream() {
@@ -58,7 +48,7 @@ public class KTableMapKeysTest {
String topic1 = "topic_map_keys";
- KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde,
stringSerde));
+ KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(Serdes.Integer(),
Serdes.String()));
final Map<Integer, String> keyMap = new HashMap<>();
keyMap.put(1, "ONE");
@@ -82,11 +72,11 @@ public class KTableMapKeysTest {
convertedStream.process(supplier);
- driver.setUp(builder, stateDir);
- for (int i = 0; i < originalKeys.length; i++) {
- driver.process(topic1, originalKeys[i], values[i]);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props))
{
+ for (int i = 0; i < originalKeys.length; i++) {
+ driver.pipeInput(recordFactory.create(topic1, originalKeys[i], values[i]));
+ }
}
- driver.flushState();
assertEquals(3, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 08fa65c..825edb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -18,10 +18,12 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
@@ -31,20 +33,19 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -53,9 +54,9 @@ public class SessionWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
private final Merger<String, String> sessionMerger = new Merger<String, String>()
{
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo)
{
@@ -83,7 +84,9 @@ public class SessionWindowedKStreamImplTest {
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ }
assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
@@ -101,7 +104,9 @@ public class SessionWindowedKStreamImplTest {
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ }
assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
@@ -121,42 +126,45 @@ public class SessionWindowedKStreamImplTest {
results.put(key, value);
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ }
assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldMaterializeCount() {
stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store"));
- processData();
- final SessionStore<String, Long> store = (SessionStore<String, Long>)
driver.allStateStores().get("count-store");
- final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1",
"2"));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ final SessionStore<String, Long> store = driver.getSessionStore("count-store");
+ final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1",
"2"));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)),
1L),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)),
1L))));
+ }
}
- @SuppressWarnings("unchecked")
@Test
public void shouldMaterializeReduced() {
stream.reduce(MockReducer.STRING_ADDER, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as("reduced"));
- processData();
- final SessionStore<String, String> sessionStore = (SessionStore<String,
String>) driver.allStateStores().get("reduced");
- final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)),
"3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)),
"1"))));
+ }
}
- @SuppressWarnings("unchecked")
@Test
public void shouldMaterializeAggregated() {
stream.aggregate(MockInitializer.STRING_INIT,
@@ -164,13 +172,15 @@ public class SessionWindowedKStreamImplTest {
sessionMerger,
Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated").withValueSerde(Serdes.String()));
- processData();
- final SessionStore<String, String> sessionStore = (SessionStore<String,
String>) driver.allStateStores().get("aggregated");
- final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1",
"2"));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)),
"0+3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)),
"0+1"))));
+ }
}
@Test(expected = NullPointerException.class)
@@ -243,16 +253,11 @@ public class SessionWindowedKStreamImplTest {
stream.count(null);
}
- private void processData() {
- driver.setUp(builder, TestUtils.tempDirectory(), 0);
- driver.setTime(10);
- driver.process(TOPIC, "1", "1");
- driver.setTime(15);
- driver.process(TOPIC, "1", "2");
- driver.setTime(600);
- driver.process(TOPIC, "1", "3");
- driver.process(TOPIC, "2", "1");
- driver.flushState();
+ private void processData(final TopologyTestDriver driver) {
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600));
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 610e52f..7b885b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -18,32 +18,33 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -52,9 +53,8 @@ public class TimeWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
-
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
private TimeWindowedKStream<String, String> windowedStream;
@Before
@@ -76,7 +76,9 @@ public class TimeWindowedKStreamImplTest {
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ }
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
@@ -95,7 +97,9 @@ public class TimeWindowedKStreamImplTest {
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ }
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
@@ -115,29 +119,32 @@ public class TimeWindowedKStreamImplTest {
results.put(key, value);
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ }
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldMaterializeCount() {
windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
- processData();
- final WindowStore<String, Long> windowStore = (WindowStore<String, Long>)
driver.allStateStores().get("count-store");
- final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
+ final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+ }
}
- @SuppressWarnings("unchecked")
@Test
public void shouldMaterializeReduced() {
windowedStream.reduce(MockReducer.STRING_ADDER,
@@ -145,17 +152,18 @@ public class TimeWindowedKStreamImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
- processData();
- final WindowStore<String, String> windowStore = (WindowStore<String, String>)
driver.allStateStores().get("reduced");
- final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+ }
}
- @SuppressWarnings("unchecked")
@Test
public void shouldMaterializeAggregated() {
windowedStream.aggregate(MockInitializer.STRING_INIT,
@@ -164,13 +172,15 @@ public class TimeWindowedKStreamImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
- processData();
- final WindowStore<String, String> windowStore = (WindowStore<String, String>)
driver.allStateStores().get("aggregated");
- final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
- assertThat(data, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
+ processData(driver);
+ final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+ }
}
@Test(expected = NullPointerException.class)
@@ -227,16 +237,11 @@ public class TimeWindowedKStreamImplTest {
windowedStream.count(null);
}
- private void processData() {
- driver.setUp(builder, TestUtils.tempDirectory(), 0);
- driver.setTime(10);
- driver.process(TOPIC, "1", "1");
- driver.setTime(15);
- driver.process(TOPIC, "1", "2");
- driver.setTime(500);
- driver.process(TOPIC, "1", "3");
- driver.process(TOPIC, "2", "1");
- driver.flushState();
+ private void processData(final TopologyTestDriver driver) {
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L));
}
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.
|