kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: speed up streams integration tests
Date Mon, 20 Jun 2016 21:37:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4544ee448 -> 44c8308ab


MINOR: speed up streams integration tests

... by
a) merging some for startup/shutdown efficiency.
b) use independent state dirs.
c) remove some tests that are covered elsewhere

guozhangwang ewencp - tests are running much quicker now, i.e, down to about 1 minute on my
laptop (from about 2 - 3 minutes). There were some issues with state-dirs in some of the integration
tests that was causing the shutdown of the streams apps to take a long time.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1525 from dguy/integration-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44c8308a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44c8308a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44c8308a

Branch: refs/heads/trunk
Commit: 44c8308ab1a6b950c5b12386d7864881b5d052a0
Parents: 4544ee4
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Jun 20 14:37:43 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jun 20 14:37:43 2016 -0700

----------------------------------------------------------------------
 .../InternalTopicIntegrationTest.java           |   3 +-
 .../integration/JoinIntegrationTest.java        |   4 +-
 .../KGroupedStreamIntegrationTest.java          |   3 +-
 .../integration/KStreamRepartitionJoinTest.java | 348 ++++++-------------
 .../integration/RegexSourceIntegrationTest.java |   1 -
 .../integration/WordCountIntegrationTest.java   |   2 +-
 .../kafka/streams/state/StateTestUtils.java     |   2 +-
 7 files changed, 122 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index b642b2a..addebae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.state.StateTestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -122,7 +123,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index ea216f3..f251a85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.StateTestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -147,7 +148,8 @@ public class JoinIntegrationTest {
         // StreamsConfig configuration (so we can retrieve whatever state directory Streams
came up
         // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
         // accordingly.
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
+                                 StateTestUtils.tempDir().getPath());
 
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
index 44e92f7..1ec6573 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.StateTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -80,7 +81,7 @@ public class KGroupedStreamIntegrationTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kgrouped-stream-test");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
 
         KeyValueMapper<Integer, String, String>
             mapper =

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 221d349..caf326d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -23,14 +23,12 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.StateTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -53,15 +51,11 @@ public class KStreamRepartitionJoinTest {
     public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
         new EmbeddedSingleNodeKafkaCluster();
 
-    private static volatile int testNo = 0;
-
     private KStreamBuilder builder;
     private Properties streamsConfiguration;
     private KStream<Long, Integer> streamOne;
     private KStream<Integer, String> streamTwo;
-    private KStream<Integer, Integer> streamThree;
     private KStream<Integer, String> streamFour;
-    private KTable<Integer, String> kTable;
     private ValueJoiner<Integer, String, String> valueJoiner;
     private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>
         keyMapper;
@@ -72,16 +66,12 @@ public class KStreamRepartitionJoinTest {
     private String streamOneInput;
     private String streamTwoInput;
     private String streamFourInput;
-    private String tableInput;
-    private String outputTopic;
-    private String streamThreeInput;
 
 
 
     @Before
     public void before() {
-        testNo++;
-        String applicationId = "kstream-repartition-join-test" + testNo;
+        String applicationId = "kstream-repartition-join-test";
         builder = new KStreamBuilder();
         createTopics();
         streamsConfiguration = new Properties();
@@ -91,15 +81,14 @@ public class KStreamRepartitionJoinTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstream-repartition-test");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+
 
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
-        streamThree = builder.stream(Serdes.Integer(), Serdes.Integer(), streamThreeInput);
         streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
 
-        kTable = builder.table(Serdes.Integer(), Serdes.String(), tableInput);
-
         valueJoiner = new ValueJoiner<Integer, String, String>() {
             @Override
             public String apply(final Integer value1, final String value2) {
@@ -124,16 +113,37 @@ public class KStreamRepartitionJoinTest {
     }
 
     @Test
-    public void shouldMapStreamOneAndJoin() throws ExecutionException, InterruptedException
{
+    public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
         produceMessages();
-        doJoin(streamOne.map(keyMapper), streamTwo);
+
+        final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
+        final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();
+        final ExpectedOutputOnTopic mapMapJoin = mapMapJoin();
+        final ExpectedOutputOnTopic selectKeyJoin = selectKeyAndJoin();
+        final ExpectedOutputOnTopic flatMapJoin = flatMapJoin();
+        final ExpectedOutputOnTopic mapRhs = joinMappedRhsStream();
+        final ExpectedOutputOnTopic mapJoinJoin = joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined();
+        final ExpectedOutputOnTopic leftJoin = mapBothStreamsAndLeftJoin();
+
         startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
+
+        verifyCorrectOutput(mapOne);
+        verifyCorrectOutput(mapBoth);
+        verifyCorrectOutput(mapMapJoin);
+        verifyCorrectOutput(selectKeyJoin);
+        verifyCorrectOutput(flatMapJoin);
+        verifyCorrectOutput(mapRhs);
+        verifyCorrectOutput(mapJoinJoin);
+        verifyLeftJoin(leftJoin);
     }
 
-    @Test
-    public void shouldMapBothStreamsAndJoin() throws Exception {
-        produceMessages();
+    private ExpectedOutputOnTopic mapStreamOneAndJoin() {
+        String mapOneStreamAndJoinOutput = "map-one-join-output";
+        doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput, "map-one-join");
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
+    }
+
+    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
 
         final KStream<Integer, Integer>
             map1 =
@@ -148,33 +158,30 @@ public class KStreamRepartitionJoinTest {
                 }
             });
 
-        doJoin(map1, map2);
-        startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
-
+        doJoin(map1, map2, "map-both-streams-and-join", "map-both-join");
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join");
     }
 
-    @Test
-    public void shouldMapMapJoin() throws Exception {
-        produceMessages();
 
+    private ExpectedOutputOnTopic mapMapJoin() throws Exception {
         final KStream<Integer, Integer> mapMapStream = streamOne.map(
             new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
                 @Override
                 public KeyValue<Long, Integer> apply(Long key, Integer value) {
+                    if (value == null) {
+                        return new KeyValue<>(null, null);
+                    }
                     return new KeyValue<>(key + value, value);
                 }
             }).map(keyMapper);
 
-        doJoin(mapMapStream, streamTwo);
-        startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
+        String outputTopic = "map-map-join";
+        doJoin(mapMapStream, streamTwo, outputTopic, outputTopic);
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
 
-    @Test
-    public void shouldSelectKeyAndJoin() throws ExecutionException, InterruptedException
{
-        produceMessages();
+    public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException
{
 
         final KStream<Integer, Integer>
             keySelected =
@@ -185,16 +192,13 @@ public class KStreamRepartitionJoinTest {
                 }
             });
 
-        doJoin(keySelected, streamTwo);
-        startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
+        String outputTopic = "select-key-join";
+        doJoin(keySelected, streamTwo, outputTopic, outputTopic);
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
 
-    @Test
-    public void shouldFlatMapJoin() throws Exception {
-        produceMessages();
-
+    private ExpectedOutputOnTopic flatMapJoin() throws Exception {
         final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
             new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>()
{
                 @Override
@@ -204,22 +208,13 @@ public class KStreamRepartitionJoinTest {
                 }
             });
 
-        doJoin(flatMapped, streamTwo);
-        startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
-    }
+        String outputTopic = "flat-map-join";
+        doJoin(flatMapped, streamTwo, outputTopic, outputTopic);
 
-    @Test
-    public void shouldJoinTwoStreamsPartitionedTheSame() throws Exception {
-        produceMessages();
-        doJoin(streamThree, streamTwo);
-        startStreams();
-        verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E"));
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-    @Test
-    public void shouldJoinWithRhsStreamMapped() throws Exception {
-        produceMessages();
+    private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
 
         ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer,
String>() {
             @Override
@@ -227,39 +222,21 @@ public class KStreamRepartitionJoinTest {
                 return value1 + ":" + value2;
             }
         };
+        String output = "join-rhs-stream-mapped";
         streamTwo
             .join(streamOne.map(keyMapper),
                   joiner,
-                  JoinWindows.of("the-join").within(60 * 1000),
+                  JoinWindows.of(output).within(60 * 1000),
                   Serdes.Integer(),
                   Serdes.String(),
                   Serdes.Integer())
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+            .to(Serdes.Integer(), Serdes.String(), output);
 
-        startStreams();
-        verifyCorrectOutput(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"));
+        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"),
+                            output);
     }
 
-    @Test
-    public void shouldLeftJoinTwoStreamsPartitionedTheSame() throws Exception {
-        produceMessages();
-        doLeftJoin(streamThree, streamTwo);
-        startStreams();
-        verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E"));
-    }
-
-    @Test
-    public void shouldMapStreamOneAndLeftJoin() throws ExecutionException, InterruptedException
{
-        produceMessages();
-        doLeftJoin(streamOne.map(keyMapper), streamTwo);
-        startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
-    }
-
-    @Test
-    public void shouldMapBothStreamsAndLeftJoin() throws Exception {
-        produceMessages();
-
+    public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
         final KStream<Integer, Integer>
             map1 =
             streamOne.map(keyMapper);
@@ -273,102 +250,20 @@ public class KStreamRepartitionJoinTest {
                 }
             });
 
-        doLeftJoin(map1, map2);
-        startStreams();
-
-        List<String> received = receiveMessages(new StringDeserializer(), 5);
-
-        if (!received.equals(expectedStreamOneTwoJoin)) {
-            produceToStreamOne();
-            verifyCorrectOutput(expectedStreamOneTwoJoin);
-        }
-
-    }
-
-    @Test
-    public void shouldLeftJoinWithRhsStreamMapped() throws Exception {
-        produceMessages();
-
-        ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer,
String>() {
-            @Override
-            public String apply(String value1, Integer value2) {
-                return value1 + ":" + value2;
-            }
-        };
-        streamTwo
-            .leftJoin(streamOne.map(keyMapper),
-                      joiner,
-                      JoinWindows.of("the-join").within(60 * 1000),
+        String outputTopic = "left-join";
+        map1.leftJoin(map2,
+                      valueJoiner,
+                      JoinWindows.of("the-left-join").within(60 * 1000),
                       Serdes.Integer(),
-                      null,
-                      Serdes.Integer())
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-
-        startStreams();
-        List<String> received = receiveMessages(new StringDeserializer(), 5);
-
-        List<String> expectedMessages = Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5");
-        if (!received.equals(expectedMessages)) {
-            produceStreamTwoInputTo(streamTwoInput);
-            verifyCorrectOutput(expectedMessages);
-        }
-    }
-
-    @Test
-    public void shouldLeftJoinWithKTableAfterMap() throws Exception {
-        produceMessages();
-        streamOne.map(keyMapper)
-            .leftJoin(kTable, valueJoiner, Serdes.Integer(), Serdes.Integer())
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-
-        startStreams();
-
-        List<String> received = receiveMessages(new StringDeserializer(), 5);
-        assertThat(received, is(expectedStreamOneTwoJoin));
-    }
-
-    @Test
-    public void shouldLeftJoinWithTableProducedFromGroupBy() throws Exception {
-        produceMessages();
-        KTable<Integer, String> aggTable =
-            streamOne.map(keyMapper)
-                .groupByKey(Serdes.Integer(), Serdes.Integer())
-                .aggregate(new Initializer<String>() {
-                    @Override
-                    public String apply() {
-                        return "";
-                    }
-                }, new Aggregator<Integer, Integer, String>() {
-                    @Override
-                    public String apply(final Integer aggKey, final Integer value,
-                                        final String aggregate) {
-                        return aggregate + ":" + value;
-                    }
-                }, Serdes.String(), "agg-by-key");
-
-        streamTwo.leftJoin(aggTable, new ValueJoiner<String, String, String>() {
-            @Override
-            public String apply(final String value1, final String value2) {
-                return value1 + "@" + value2;
-            }
-        }, Serdes.Integer(), Serdes.String())
+                      Serdes.Integer(),
+                      Serdes.String())
             .to(Serdes.Integer(), Serdes.String(), outputTopic);
 
-        startStreams();
-
-        receiveMessages(new StringDeserializer(), 5);
-        produceStreamTwoInputTo(streamTwoInput);
-        List<String> received = receiveMessages(new StringDeserializer(), 5);
-
-        assertThat(received, is(Arrays.asList("A@:1", "B@:2", "C@:3", "D@:4", "E@:5")));
-
+        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-
-    @Test
-    public void shouldJoinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception
{
-        produceMessages();
-
+    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws
+                                                                                   Exception
{
         final KStream<Integer, Integer>
             map1 =
             streamOne.map(keyMapper);
@@ -387,7 +282,7 @@ public class KStreamRepartitionJoinTest {
 
         final KStream<Integer, String> join = map1.join(map2,
                                                         valueJoiner,
-                                                        JoinWindows.of("the-join")
+                                                        JoinWindows.of("join-one")
                                                             .within(60 * 1000),
                                                         Serdes.Integer(),
                                                         Serdes.Integer(),
@@ -399,6 +294,7 @@ public class KStreamRepartitionJoinTest {
                 return value1 + ":" + value2;
             }
         };
+        String topic = "map-join-join";
         join.map(kvMapper)
             .join(streamFour.map(kvMapper),
                   joiner,
@@ -406,56 +302,50 @@ public class KStreamRepartitionJoinTest {
                   Serdes.Integer(),
                   Serdes.String(),
                   Serdes.String())
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+            .to(Serdes.Integer(), Serdes.String(), topic);
 
-        startStreams();
-        verifyCorrectOutput(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"));
+
+        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D",
"5:E:E"),
+                            topic);
     }
 
-    @Test
-    public void shouldFilterNullKeysWhenRepartionedOnJoin() throws Exception {
-        produceMessages();
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamOneInput,
-            Collections.singleton(
-                new KeyValue<Long, Integer>(70L, null)),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                LongSerializer.class,
-                IntegerSerializer.class,
-                new Properties()));
 
-        doJoin(streamOne.map(keyMapper), streamTwo);
-        startStreams();
-        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    private class ExpectedOutputOnTopic {
+        private final List<String> expectedOutput;
+        private final String outputTopic;
+
+        ExpectedOutputOnTopic(final List<String> expectedOutput, final String outputTopic)
{
+            this.expectedOutput = expectedOutput;
+            this.outputTopic = outputTopic;
+        }
+    }
+
+
+    private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic)
+        throws InterruptedException {
+        assertThat(receiveMessages(new StringDeserializer(),
+                                   expectedOutputOnTopic.expectedOutput.size(),
+                                   expectedOutputOnTopic.outputTopic),
+                   is(expectedOutputOnTopic.expectedOutput));
+    }
+    private void verifyLeftJoin(ExpectedOutputOnTopic expectedOutputOnTopic)
+        throws InterruptedException, ExecutionException {
+        List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic
+            .expectedOutput.size(), expectedOutputOnTopic.outputTopic);
+        if (!received.equals(expectedOutputOnTopic.expectedOutput)) {
+            produceToStreamOne();
+            verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic);
+        }
     }
 
     private void produceMessages()
         throws ExecutionException, InterruptedException {
         produceToStreamOne();
         produceStreamTwoInputTo(streamTwoInput);
-        produceToStreamThree();
-        produceStreamTwoInputTo(tableInput);
         produceStreamTwoInputTo(streamFourInput);
 
     }
 
-    private void produceToStreamThree()
-        throws ExecutionException, InterruptedException {
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamThreeInput,
-            Arrays.asList(
-                new KeyValue<>(1, 10),
-                new KeyValue<>(2, 20),
-                new KeyValue<>(3, 30),
-                new KeyValue<>(4, 40),
-                new KeyValue<>(5, 50)),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                IntegerSerializer.class,
-                new Properties()));
-    }
 
     private void produceStreamTwoInputTo(final String streamTwoInput)
         throws ExecutionException, InterruptedException {
@@ -483,7 +373,8 @@ public class KStreamRepartitionJoinTest {
                 new KeyValue<>(5L, 2),
                 new KeyValue<>(12L, 3),
                 new KeyValue<>(15L, 4),
-                new KeyValue<>(20L, 5)),
+                new KeyValue<>(20L, 5),
+                new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
             TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
                 LongSerializer.class,
@@ -492,18 +383,12 @@ public class KStreamRepartitionJoinTest {
     }
 
     private void createTopics() {
-        streamOneInput = "stream-one-" + testNo;
-        streamTwoInput = "stream-two-" + testNo;
-        streamThreeInput = "stream-three-" + testNo;
-        streamFourInput = "stream-four-" + testNo;
-        tableInput = "table-stream-two-" + testNo;
-        outputTopic = "output-" + testNo;
+        streamOneInput = "stream-one";
+        streamTwoInput = "stream-two";
+        streamFourInput = "stream-four";
         CLUSTER.createTopic(streamOneInput);
         CLUSTER.createTopic(streamTwoInput, 2, 1);
-        CLUSTER.createTopic(streamThreeInput, 2, 1);
         CLUSTER.createTopic(streamFourInput);
-        CLUSTER.createTopic(tableInput, 2, 1);
-        CLUSTER.createTopic(outputTopic);
     }
 
 
@@ -514,20 +399,20 @@ public class KStreamRepartitionJoinTest {
 
 
     private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
-                                         final int numMessages) throws InterruptedException
{
+                                         final int numMessages, final String topic) throws
InterruptedException {
 
         final Properties config = new Properties();
 
         config
             .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test-" + testNo);
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
         config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                                        IntegerDeserializer.class.getName());
         config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                                        valueDeserializer.getClass().getName());
         List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
-                                                                                      outputTopic,
+                                                                                      topic,
                                                                                       numMessages,
                                                                                       60
*
                                                                                       1000);
@@ -535,31 +420,24 @@ public class KStreamRepartitionJoinTest {
         return received;
     }
 
-    private void verifyCorrectOutput(List<String> expectedMessages) throws InterruptedException
{
-        assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size()),
+    private void verifyCorrectOutput(List<String> expectedMessages,
+                                     final String topic) throws InterruptedException {
+        assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic),
                    is(expectedMessages));
     }
 
     private void doJoin(KStream<Integer, Integer> lhs,
-                        KStream<Integer, String> rhs) {
+                        KStream<Integer, String> rhs,
+                        String outputTopic,
+                        final String joinName) {
+        CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
                  valueJoiner,
-                 JoinWindows.of("the-join").within(60 * 1000),
+                 JoinWindows.of(joinName).within(60 * 1000),
                  Serdes.Integer(),
                  Serdes.Integer(),
                  Serdes.String())
             .to(Serdes.Integer(), Serdes.String(), outputTopic);
     }
 
-    private void doLeftJoin(KStream<Integer, Integer> lhs,
-                            KStream<Integer, String> rhs) {
-        lhs.leftJoin(rhs,
-                     valueJoiner,
-                     JoinWindows.of("the-join").within(60 * 1000),
-                     Serdes.Integer(),
-                     Serdes.Integer(),
-                     Serdes.String())
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 7e18cff..cf48391 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -313,7 +313,6 @@ public class RegexSourceIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
         return streamsConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index 2966590..9692cda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -91,7 +91,7 @@ public class WordCountIntegrationTest {
         // StreamsConfig configuration (so we can retrieve whatever state directory Streams
came up
         // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
         // accordingly.
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstreams-word-count");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c8308a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
index 70e6cf6..f348fc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
@@ -38,7 +38,7 @@ public class StateTestUtils {
      */
     public static File tempDir() {
         try {
-            final File dir = Files.createTempDirectory("test").toFile();
+            final File dir = Files.createTempDirectory(new File("/tmp").toPath(), "test").toFile();
             dir.mkdirs();
             dir.deleteOnExit();
 


Mime
View raw message