kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: MINOR: changes embedded broker time to MockTime
Date Tue, 06 Sep 2016 22:35:16 GMT
MINOR: changes embedded broker time to MockTime

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Ismael Juma, Guozhang Wang

Closes #1808 from mjsax/mockTime


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de1b853c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de1b853c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de1b853c

Branch: refs/heads/trunk
Commit: de1b853c3ed326cf296a56538ca9570b0ecc0636
Parents: ed639e8
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Sep 6 15:35:12 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 6 15:35:12 2016 -0700

----------------------------------------------------------------------
 .../integration/FanoutIntegrationTest.java      |  56 ++---
 .../InternalTopicIntegrationTest.java           |  66 +++---
 .../KStreamAggregationIntegrationTest.java      | 127 ++++++------
 .../KStreamKTableJoinIntegrationTest.java       |  58 +++---
 .../integration/KStreamRepartitionJoinTest.java | 167 +++++++--------
 .../QueryableStateIntegrationTest.java          | 203 ++++++++++---------
 .../integration/RegexSourceIntegrationTest.java | 127 ++++++------
 .../integration/utils/EmbeddedKafkaCluster.java |  35 ++--
 .../integration/utils/IntegrationTestUtils.java | 124 +++++------
 .../integration/utils/KafkaEmbedded.java        |  86 ++++----
 10 files changed, 538 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 56cba58..efc427a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -26,31 +27,31 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
 
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 /**
  * End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster.
- *
+ * <p>
  * This example shows how you can read from one input topic/stream, transform the data (here:
  * trivially) in two different ways via two intermediate streams, and then write the respective
  * results to two output topics.
- *
+ * <p>
  * <pre>
  * {@code
  *
@@ -67,6 +68,7 @@ public class FanoutIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
     private static final String INPUT_TOPIC_A = "A";
     private static final String OUTPUT_TOPIC_B = "B";
     private static final String OUTPUT_TOPIC_C = "C";
@@ -80,10 +82,10 @@ public class FanoutIntegrationTest {
 
     @Test
     public void shouldFanoutTheInput() throws Exception {
-        List<String> inputValues = Arrays.asList("Hello", "World");
-        List<String> expectedValuesForB = new ArrayList<>();
-        List<String> expectedValuesForC = new ArrayList<>();
-        for (String input : inputValues) {
+        final List<String> inputValues = Arrays.asList("Hello", "World");
+        final List<String> expectedValuesForB = new ArrayList<>();
+        final List<String> expectedValuesForC = new ArrayList<>();
+        for (final String input : inputValues) {
             expectedValuesForB.add(input.toUpperCase(Locale.getDefault()));
             expectedValuesForC.add(input.toLowerCase(Locale.getDefault()));
         }
@@ -91,73 +93,73 @@ public class FanoutIntegrationTest {
         //
         // Step 1: Configure and start the processor topology.
         //
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        Properties streamsConfiguration = new Properties();
+        final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
-        KStream<byte[], String> stream2 = stream1.mapValues(
+        final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
+        final KStream<byte[], String> stream2 = stream1.mapValues(
             new ValueMapper<String, String>() {
                 @Override
-                public String apply(String value) {
+                public String apply(final String value) {
                     return value.toUpperCase(Locale.getDefault());
                 }
             });
-        KStream<byte[], String> stream3 = stream1.mapValues(
+        final KStream<byte[], String> stream3 = stream1.mapValues(
             new ValueMapper<String, String>() {
                 @Override
-                public String apply(String value) {
+                public String apply(final String value) {
                     return value.toLowerCase(Locale.getDefault());
                 }
             });
         stream2.to(OUTPUT_TOPIC_B);
         stream3.to(OUTPUT_TOPIC_C);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
         //
         // Step 2: Produce some input data to the input topic.
         //
-        Properties producerConfig = new Properties();
+        final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
         producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig, mockTime);
 
         //
         // Step 3: Verify the application's output data.
         //
 
         // Verify output topic B
-        Properties consumerConfigB = new Properties();
+        final Properties consumerConfigB = new Properties();
         consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicB");
         consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
+        final List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
             OUTPUT_TOPIC_B, inputValues.size());
         assertThat(actualValuesForB, equalTo(expectedValuesForB));
 
         // Verify output topic C
-        Properties consumerConfigC = new Properties();
+        final Properties consumerConfigC = new Properties();
         consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicC");
         consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
+        final List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
             OUTPUT_TOPIC_C, inputValues.size());
         streams.close();
         assertThat(actualValuesForC, equalTo(expectedValuesForC));
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index f88c1b2..b9a1cf6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,11 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.admin.AdminUtils;
+import kafka.log.LogConfig;
+import kafka.utils.MockTime;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -24,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -37,19 +42,16 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Map;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
 
-import kafka.admin.AdminUtils;
-import kafka.log.LogConfig;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.Map;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests related to internal topics in streams
@@ -58,6 +60,7 @@ public class InternalTopicIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
@@ -71,6 +74,7 @@ public class InternalTopicIntegrationTest {
 
     /**
      * Validates that any state changelog topics are compacted
+     *
      * @return true if topics have a valid config, false otherwise
      */
     private boolean isUsingCompactionForStateChangelogTopics() {
@@ -80,20 +84,20 @@ public class InternalTopicIntegrationTest {
         // createTopic() will only seem to work (it will return without error).  The topic will exist in
         // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
         // topic.
-        ZkClient zkClient = new ZkClient(
+        final ZkClient zkClient = new ZkClient(
             CLUSTER.zKConnectString(),
             DEFAULT_ZK_SESSION_TIMEOUT_MS,
             DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
             ZKStringSerializer$.MODULE$);
-        boolean isSecure = false;
-        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure);
+        final boolean isSecure = false;
+        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure);
 
-        Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
-        Iterator it = topicConfigs.iterator();
+        final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
+        final Iterator it = topicConfigs.iterator();
         while (it.hasNext()) {
-            Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
-            String topic = topicConfig._1;
-            Properties prop = topicConfig._2;
+            final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
+            final String topic = topicConfig._1;
+            final Properties prop = topicConfig._2;
 
             // state changelogs should be compacted
             if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
@@ -110,7 +114,7 @@ public class InternalTopicIntegrationTest {
 
     @Test
     public void shouldCompactTopicsForStateChangelogs() throws Exception {
-        List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
+        final List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
 
         //
         // Step 1: Configure and start a simple word count topology
@@ -118,7 +122,7 @@ public class InternalTopicIntegrationTest {
         final Serde<String> stringSerde = Serdes.String();
         final Serde<Long> longSerde = Serdes.Long();
 
-        Properties streamsConfiguration = new Properties();
+        final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
@@ -126,37 +130,37 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
+        final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
-        KStream<String, Long> wordCounts = textLines
+        final KStream<String, Long> wordCounts = textLines
             .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                 @Override
-                public Iterable<String> apply(String value) {
+                public Iterable<String> apply(final String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
             }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
-                .count("Counts").toStream();
+            .count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
 
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
         //
         // Step 2: Produce some input data to the input topic.
         //
-        Properties producerConfig = new Properties();
+        final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
         producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime);
 
         //
         // Step 3: Verify the state changelog topics are compact

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 17e197c..6da2a95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -3,13 +3,18 @@
  * agreements.  See the NOTICE file distributed with this work for additional information regarding
  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.  You may obtain a
- * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
- * the License for the specific language governing permissions and limitations under the License.
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -52,10 +57,11 @@ import static org.hamcrest.core.Is.is;
 
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
+
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static volatile int testNo = 0;
+    private final MockTime mockTime = CLUSTER.time;
     private KStreamBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
@@ -74,8 +80,7 @@ public class KStreamAggregationIntegrationTest {
         builder = new KStreamBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        String applicationId = "kgrouped-stream-test-" +
-                       testNo;
+        final String applicationId = "kgrouped-stream-test-" + testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -83,7 +88,7 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
-        KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
+        final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
         groupedStream = stream
             .groupBy(
@@ -93,7 +98,7 @@ public class KStreamAggregationIntegrationTest {
 
         reducer = new Reducer<String>() {
             @Override
-            public String apply(String value1, String value2) {
+            public String apply(final String value1, final String value2) {
                 return value1 + ":" + value2;
             }
         };
@@ -105,7 +110,7 @@ public class KStreamAggregationIntegrationTest {
         };
         aggregator = new Aggregator<String, String, Integer>() {
             @Override
-            public Integer apply(String aggKey, String value, Integer aggregate) {
+            public Integer apply(final String aggKey, final String value, final Integer aggregate) {
                 return aggregate + value.length();
             }
         };
@@ -122,40 +127,39 @@ public class KStreamAggregationIntegrationTest {
 
     @Test
     public void shouldReduce() throws Exception {
-        produceMessages(System.currentTimeMillis());
+        produceMessages(mockTime.milliseconds());
         groupedStream
             .reduce(reducer, "reduce-by-key")
             .to(Serdes.String(), Serdes.String(), outputTopic);
 
         startStreams();
 
-        produceMessages(System.currentTimeMillis());
+        produceMessages(mockTime.milliseconds());
 
-        List<KeyValue<String, String>> results = receiveMessages(
+        final List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer()
             , 10);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
-            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
+            public int compare(final KeyValue<String, String> o1, final KeyValue<String, String> o2) {
                 return KStreamAggregationIntegrationTest.compare(o1, o2);
             }
         });
 
         assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
-                                             KeyValue.pair("A", "A:A"),
-                                             KeyValue.pair("B", "B"),
-                                             KeyValue.pair("B", "B:B"),
-                                             KeyValue.pair("C", "C"),
-                                             KeyValue.pair("C", "C:C"),
-                                             KeyValue.pair("D", "D"),
-                                             KeyValue.pair("D", "D:D"),
-                                             KeyValue.pair("E", "E"),
-                                             KeyValue.pair("E", "E:E"))));
+            KeyValue.pair("A", "A:A"),
+            KeyValue.pair("B", "B"),
+            KeyValue.pair("B", "B:B"),
+            KeyValue.pair("C", "C"),
+            KeyValue.pair("C", "C:C"),
+            KeyValue.pair("D", "D"),
+            KeyValue.pair("D", "D:D"),
+            KeyValue.pair("E", "E"),
+            KeyValue.pair("E", "E:E"))));
     }
 
-    @SuppressWarnings("unchecked")
     private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
                                                                             final KeyValue<K, V> o2) {
         final int keyComparison = o1.key.compareTo(o2.key);
@@ -167,9 +171,10 @@ public class KStreamAggregationIntegrationTest {
 
     @Test
     public void shouldReduceWindowed() throws Exception {
-        long firstBatchTimestamp = System.currentTimeMillis() - 1000;
+        final long firstBatchTimestamp = mockTime.milliseconds();
+        mockTime.sleep(1000);
         produceMessages(firstBatchTimestamp);
-        long secondBatchTimestamp = System.currentTimeMillis();
+        final long secondBatchTimestamp = mockTime.milliseconds();
         produceMessages(secondBatchTimestamp);
         produceMessages(secondBatchTimestamp);
 
@@ -177,7 +182,7 @@ public class KStreamAggregationIntegrationTest {
             .reduce(reducer, TimeWindows.of(500L), "reduce-time-windows")
             .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
                 @Override
-                public String apply(Windowed<String> windowedKey, String value) {
+                public String apply(final Windowed<String> windowedKey, final String value) {
                     return windowedKey.key() + "@" + windowedKey.window().start();
                 }
             })
@@ -185,12 +190,12 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        List<KeyValue<String, String>> windowedOutput = receiveMessages(
+        final List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer()
             , 15);
 
-        Comparator<KeyValue<String, String>>
+        final Comparator<KeyValue<String, String>>
             comparator =
             new Comparator<KeyValue<String, String>>() {
                 @Override
@@ -201,8 +206,8 @@ public class KStreamAggregationIntegrationTest {
             };
 
         Collections.sort(windowedOutput, comparator);
-        long firstBatchWindow = firstBatchTimestamp / 500 * 500;
-        long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+        final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
+        final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
         assertThat(windowedOutput, is(
             Arrays.asList(
@@ -227,7 +232,7 @@ public class KStreamAggregationIntegrationTest {
 
     @Test
     public void shouldAggregate() throws Exception {
-        produceMessages(System.currentTimeMillis());
+        produceMessages(mockTime.milliseconds());
         groupedStream.aggregate(
             initializer,
             aggregator,
@@ -237,16 +242,16 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        produceMessages(System.currentTimeMillis());
+        produceMessages(mockTime.milliseconds());
 
-        List<KeyValue<String, Integer>> results = receiveMessages(
+        final List<KeyValue<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
             new IntegerDeserializer()
             , 10);
 
         Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
             @Override
-            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
+            public int compare(final KeyValue<String, Integer> o1, final KeyValue<String, Integer> o2) {
                 return KStreamAggregationIntegrationTest.compare(o1, o2);
             }
         });
@@ -267,9 +272,10 @@ public class KStreamAggregationIntegrationTest {
 
     @Test
     public void shouldAggregateWindowed() throws Exception {
-        long firstTimestamp = System.currentTimeMillis() - 1000;
+        final long firstTimestamp = mockTime.milliseconds();
+        mockTime.sleep(1000);
         produceMessages(firstTimestamp);
-        long secondTimestamp = System.currentTimeMillis();
+        final long secondTimestamp = mockTime.milliseconds();
         produceMessages(secondTimestamp);
         produceMessages(secondTimestamp);
 
@@ -280,7 +286,7 @@ public class KStreamAggregationIntegrationTest {
             Serdes.Integer(), "aggregate-by-key-windowed")
             .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
                 @Override
-                public String apply(Windowed<String> windowedKey, Integer value) {
+                public String apply(final Windowed<String> windowedKey, final Integer value) {
                     return windowedKey.key() + "@" + windowedKey.window().start();
                 }
             })
@@ -288,12 +294,12 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+        final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
             new StringDeserializer(),
             new IntegerDeserializer()
             , 15);
 
-        Comparator<KeyValue<String, Integer>>
+        final Comparator<KeyValue<String, Integer>>
             comparator =
             new Comparator<KeyValue<String, Integer>>() {
                 @Override
@@ -305,8 +311,8 @@ public class KStreamAggregationIntegrationTest {
 
         Collections.sort(windowedMessages, comparator);
 
-        long firstWindow = firstTimestamp / 500 * 500;
-        long secondWindow = secondTimestamp / 500 * 500;
+        final long firstWindow = firstTimestamp / 500 * 500;
+        final long secondWindow = secondTimestamp / 500 * 500;
 
         assertThat(windowedMessages, is(
             Arrays.asList(
@@ -330,22 +336,22 @@ public class KStreamAggregationIntegrationTest {
 
     @Test
     public void shouldCount() throws Exception {
-        produceMessages(System.currentTimeMillis());
+        produceMessages(mockTime.milliseconds());
 
         groupedStream.count("count-by-key")
             .to(Serdes.String(), Serdes.Long(), outputTopic);
 
         startStreams();
 
-        produceMessages(System.currentTimeMillis());
+        produceMessages(mockTime.milliseconds());
 
-        List<KeyValue<String, Long>> results = receiveMessages(
+        final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer()
             , 10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
-            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+            public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
                 return KStreamAggregationIntegrationTest.compare(o1, o2);
             }
         });
@@ -366,7 +372,7 @@ public class KStreamAggregationIntegrationTest {
 
     @Test
     public void shouldGroupByKey() throws Exception {
-        long timestamp = System.currentTimeMillis();
+        final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
 
@@ -381,18 +387,18 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        List<KeyValue<String, Long>> results = receiveMessages(
+        final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer()
             , 10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
-            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+            public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
                 return KStreamAggregationIntegrationTest.compare(o1, o2);
             }
         });
 
-        long window = timestamp / 500 * 500;
+        final long window = timestamp / 500 * 500;
         assertThat(results, is(Arrays.asList(
             KeyValue.pair("1@" + window, 1L),
             KeyValue.pair("1@" + window, 2L),
@@ -409,7 +415,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
 
-    private void produceMessages(long timestamp)
+    private void produceMessages(final long timestamp)
         throws ExecutionException, InterruptedException {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             streamOneInput,
@@ -450,16 +456,15 @@ public class KStreamAggregationIntegrationTest {
         final Properties consumerProperties = new Properties();
         consumerProperties
             .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
-                                                                       testNo);
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                                       keyDeserializer.getClass().getName());
-        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                                       valueDeserializer.getClass().getName());
-        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
-                                                                        outputTopic,
-                                                                        numMessages, 60 * 1000);
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            numMessages,
+            60 * 1000);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 536ad24..4a13482 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.  You may obtain a
  * copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -26,6 +27,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -41,9 +44,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
@@ -52,10 +52,11 @@ import static org.junit.Assert.assertThat;
  * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
  */
 public class KStreamKTableJoinIntegrationTest {
-
     private static final int NUM_BROKERS = 1;
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
     private static final String USER_CLICKS_TOPIC = "user-clicks";
     private static final String USER_REGIONS_TOPIC = "user-regions";
     private static final String USER_REGIONS_STORE_NAME = "user-regions-store-name";
@@ -76,7 +77,7 @@ public class KStreamKTableJoinIntegrationTest {
         private final String region;
         private final long clicks;
 
-        public RegionWithClicks(String region, long clicks) {
+        public RegionWithClicks(final String region, final long clicks) {
             if (region == null || region.isEmpty()) {
                 throw new IllegalArgumentException("region must be set");
             }
@@ -100,7 +101,7 @@ public class KStreamKTableJoinIntegrationTest {
     @Test
     public void shouldCountClicksPerRegion() throws Exception {
         // Input 1: Clicks per user (multiple records allowed per user).
-        List<KeyValue<String, Long>> userClicks = Arrays.asList(
+        final List<KeyValue<String, Long>> userClicks = Arrays.asList(
             new KeyValue<>("alice", 13L),
             new KeyValue<>("bob", 4L),
             new KeyValue<>("chao", 25L),
@@ -112,7 +113,7 @@ public class KStreamKTableJoinIntegrationTest {
         );
 
         // Input 2: Region per user (multiple records allowed per user).
-        List<KeyValue<String, String>> userRegions = Arrays.asList(
+        final List<KeyValue<String, String>> userRegions = Arrays.asList(
             new KeyValue<>("alice", "asia"),   /* Alice lived in Asia originally... */
             new KeyValue<>("bob", "americas"),
             new KeyValue<>("chao", "asia"),
@@ -122,7 +123,7 @@ public class KStreamKTableJoinIntegrationTest {
             new KeyValue<>("fang", "asia")
         );
 
-        List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
+        final List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
             new KeyValue<>("europe", 13L),
             new KeyValue<>("americas", 4L),
             new KeyValue<>("asia", 25L),
@@ -139,26 +140,25 @@ public class KStreamKTableJoinIntegrationTest {
         final Serde<String> stringSerde = Serdes.String();
         final Serde<Long> longSerde = Serdes.Long();
 
-        Properties streamsConfiguration = new Properties();
+        final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
-                                 TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
         // This KStream contains information such as "alice" -> 13L.
         //
         // Because this is a KStream ("record stream"), multiple records for the same user will be
         // considered as separate click-count events, each of which will be added to the total count.
-        KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
+        final KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
 
         // This KTable contains information such as "alice" -> "europe".
         //
@@ -171,14 +171,14 @@ public class KStreamKTableJoinIntegrationTest {
         // lived in "asia") because, at the time her first user-click record is being received and
         // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
         // (which overrides her previous region value of "asia").
-        KTable<String, String> userRegionsTable =
+        final KTable<String, String> userRegionsTable =
             builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC, USER_REGIONS_STORE_NAME);
 
         // Compute the number of clicks per region, e.g. "europe" -> 13L.
         //
         // The resulting KTable is continuously being updated as new data records are arriving in the
         // input KStream `userClicksStream` and input KTable `userRegionsTable`.
-        KTable<String, Long> clicksPerRegion = userClicksStream
+        final KTable<String, Long> clicksPerRegion = userClicksStream
             // Join the stream against the table.
             //
             // Null values possible: In general, null values are possible for region (i.e. the value of
@@ -192,14 +192,14 @@ public class KStreamKTableJoinIntegrationTest {
             // achieve the same effect.
             .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
                 @Override
-                public RegionWithClicks apply(Long clicks, String region) {
+                public RegionWithClicks apply(final Long clicks, final String region) {
                     return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
                 }
             })
             // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
             .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() {
                 @Override
-                public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
+                public KeyValue<String, Long> apply(final String key, final RegionWithClicks value) {
                     return new KeyValue<>(value.getRegion(), value.getClicks());
                 }
             })
@@ -207,7 +207,7 @@ public class KStreamKTableJoinIntegrationTest {
             .groupByKey(stringSerde, longSerde)
             .reduce(new Reducer<Long>() {
                 @Override
-                public Long apply(Long value1, Long value2) {
+                public Long apply(final Long value1, final Long value2) {
                     return value1 + value2;
                 }
             }, "ClicksPerRegionUnwindowed");
@@ -215,47 +215,47 @@ public class KStreamKTableJoinIntegrationTest {
         // Write the (continuously updating) results to the output topic.
         clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
         //
         // Step 2: Publish user-region information.
         //
         // To keep this code example simple and easier to understand/reason about, we publish all
-        // user-region records before any user-click records (cf. step 3).  In practice though,
+        // user-region records before any user-click records (cf. step 3). In practice though,
         // data records would typically be arriving concurrently in both input streams/topics.
-        Properties userRegionsProducerConfig = new Properties();
+        final Properties userRegionsProducerConfig = new Properties();
         userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
         userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig);
+        IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig, mockTime);
 
         //
         // Step 3: Publish some user click events.
         //
-        Properties userClicksProducerConfig = new Properties();
+        final Properties userClicksProducerConfig = new Properties();
         userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
         userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
-        IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
+        IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig, mockTime);
 
         //
         // Step 4: Verify the application's output data.
         //
-        Properties consumerConfig = new Properties();
+        final Properties consumerConfig = new Properties();
         consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer");
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
-        List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+        final List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
             OUTPUT_TOPIC, expectedClicksPerRegion.size());
         streams.close();
         assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index de9c2c9..e9a7da1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -3,13 +3,18 @@
  * agreements.  See the NOTICE file distributed with this work for additional information regarding
  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.  You may obtain a
- * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
- * the License for the specific language governing permissions and limitations under the License.
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -48,10 +53,10 @@ import static org.hamcrest.core.Is.is;
 
 public class KStreamRepartitionJoinTest {
     private static final int NUM_BROKERS = 1;
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
 
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 
     private KStreamBuilder builder;
@@ -70,10 +75,9 @@ public class KStreamRepartitionJoinTest {
     private String streamFourInput;
 
 
-
     @Before
     public void before() {
-        String applicationId = "kstream-repartition-join-test";
+        final String applicationId = "kstream-repartition-join-test";
         builder = new KStreamBuilder();
         createTopics();
         streamsConfiguration = new Properties();
@@ -95,7 +99,7 @@ public class KStreamRepartitionJoinTest {
             }
         };
 
-        keyMapper = MockKeyValueMapper.<Long, Integer>SelectValueKeyValueMapper();
+        keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
     }
 
     @After
@@ -132,8 +136,8 @@ public class KStreamRepartitionJoinTest {
     }
 
     private ExpectedOutputOnTopic mapStreamOneAndJoin() {
-        String mapOneStreamAndJoinOutput = "map-one-join-output";
-        doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput, "map-one-join");
+        final String mapOneStreamAndJoinOutput = "map-one-join-output";
+        doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
     }
 
@@ -141,7 +145,7 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
         final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
-        doJoin(map1, map2, "map-both-streams-and-join", "map-both-join");
+        doJoin(map1, map2, "map-both-streams-and-join");
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join");
     }
 
@@ -150,7 +154,7 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, Integer> mapMapStream = streamOne.map(
             new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
                 @Override
-                public KeyValue<Long, Integer> apply(Long key, Integer value) {
+                public KeyValue<Long, Integer> apply(final Long key, final Integer value) {
                     if (value == null) {
                         return new KeyValue<>(null, null);
                     }
@@ -158,8 +162,8 @@ public class KStreamRepartitionJoinTest {
                 }
             }).map(keyMapper);
 
-        String outputTopic = "map-map-join";
-        doJoin(mapMapStream, streamTwo, outputTopic, outputTopic);
+        final String outputTopic = "map-map-join";
+        doJoin(mapMapStream, streamTwo, outputTopic);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
@@ -167,10 +171,10 @@ public class KStreamRepartitionJoinTest {
     public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
 
         final KStream<Integer, Integer> keySelected =
-                streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
+            streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
 
-        String outputTopic = "select-key-join";
-        doJoin(keySelected, streamTwo, outputTopic, outputTopic);
+        final String outputTopic = "select-key-join";
+        doJoin(keySelected, streamTwo, outputTopic);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
@@ -179,38 +183,36 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
             new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
                 @Override
-                public Iterable<KeyValue<Integer, Integer>> apply(Long key,
-                                                                  Integer value) {
+                public Iterable<KeyValue<Integer, Integer>> apply(final Long key, final Integer value) {
                     return Collections.singletonList(new KeyValue<>(value, value));
                 }
             });
 
-        String outputTopic = "flat-map-join";
-        doJoin(flatMapped, streamTwo, outputTopic, outputTopic);
+        final String outputTopic = "flat-map-join";
+        doJoin(flatMapped, streamTwo, outputTopic);
 
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
     private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
 
-        ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
+        final ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
             @Override
-            public String apply(String value1, Integer value2) {
+            public String apply(final String value1, final Integer value2) {
                 return value1 + ":" + value2;
             }
         };
-        String output = "join-rhs-stream-mapped";
+        final String output = "join-rhs-stream-mapped";
         streamTwo
             .join(streamOne.map(keyMapper),
-                  joiner,
-                  getJoinWindow(),
-                  Serdes.Integer(),
-                  Serdes.String(),
-                  Serdes.Integer())
+                joiner,
+                getJoinWindow(),
+                Serdes.Integer(),
+                Serdes.String(),
+                Serdes.Integer())
             .to(Serdes.Integer(), Serdes.String(), output);
 
-        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"),
-                            output);
+        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
     }
 
     public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
@@ -218,53 +220,51 @@ public class KStreamRepartitionJoinTest {
 
         final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
-        String outputTopic = "left-join";
+        final String outputTopic = "left-join";
         map1.leftJoin(map2,
-                      valueJoiner,
-                      getJoinWindow(),
-                      Serdes.Integer(),
-                      Serdes.Integer(),
-                      Serdes.String())
+            valueJoiner,
+            getJoinWindow(),
+            Serdes.Integer(),
+            Serdes.Integer(),
+            Serdes.String())
             .to(Serdes.Integer(), Serdes.String(), outputTopic);
 
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws
-                                                                                   Exception {
+    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
         final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
-            kvMapper = MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper();
+            kvMapper = MockKeyValueMapper.NoOpKeyValueMapper();
 
         final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
 
         final KStream<Integer, String> join = map1.join(map2,
-                                                        valueJoiner,
-                                                        getJoinWindow(),
-                                                        Serdes.Integer(),
-                                                        Serdes.Integer(),
-                                                        Serdes.String());
+            valueJoiner,
+            getJoinWindow(),
+            Serdes.Integer(),
+            Serdes.Integer(),
+            Serdes.String());
 
-        ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
             @Override
             public String apply(final String value1, final String value2) {
                 return value1 + ":" + value2;
             }
         };
-        String topic = "map-join-join";
+        final String topic = "map-join-join";
         join.map(kvMapper)
             .join(streamFour.map(kvMapper),
-                  joiner,
-                  getJoinWindow(),
-                  Serdes.Integer(),
-                  Serdes.String(),
-                  Serdes.String())
+                joiner,
+                getJoinWindow(),
+                Serdes.Integer(),
+                Serdes.String(),
+                Serdes.String())
             .to(Serdes.Integer(), Serdes.String(), topic);
 
 
-        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"),
-                            topic);
+        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic);
     }
 
     private JoinWindows getJoinWindow() {
@@ -286,13 +286,14 @@ public class KStreamRepartitionJoinTest {
     private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic)
         throws InterruptedException {
         assertThat(receiveMessages(new StringDeserializer(),
-                                   expectedOutputOnTopic.expectedOutput.size(),
-                                   expectedOutputOnTopic.outputTopic),
-                   is(expectedOutputOnTopic.expectedOutput));
+            expectedOutputOnTopic.expectedOutput.size(),
+            expectedOutputOnTopic.outputTopic),
+            is(expectedOutputOnTopic.expectedOutput));
     }
-    private void verifyLeftJoin(ExpectedOutputOnTopic expectedOutputOnTopic)
+
+    private void verifyLeftJoin(final ExpectedOutputOnTopic expectedOutputOnTopic)
         throws InterruptedException, ExecutionException {
-        List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic
+        final List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic
             .expectedOutput.size(), expectedOutputOnTopic.outputTopic);
         if (!received.equals(expectedOutputOnTopic.expectedOutput)) {
             produceToStreamOne();
@@ -323,7 +324,8 @@ public class KStreamRepartitionJoinTest {
                 CLUSTER.bootstrapServers(),
                 IntegerSerializer.class,
                 StringSerializer.class,
-                new Properties()));
+                new Properties()),
+            mockTime);
     }
 
     private void produceToStreamOne()
@@ -341,7 +343,8 @@ public class KStreamRepartitionJoinTest {
                 CLUSTER.bootstrapServers(),
                 LongSerializer.class,
                 IntegerSerializer.class,
-                new Properties()));
+                new Properties()),
+            mockTime);
     }
 
     private void createTopics() {
@@ -365,40 +368,38 @@ public class KStreamRepartitionJoinTest {
 
         final Properties config = new Properties();
 
-        config
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
         config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                                       IntegerDeserializer.class.getName());
+            IntegerDeserializer.class.getName());
         config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                                       valueDeserializer.getClass().getName());
-        List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
-                                                                                      topic,
-                                                                                      numMessages,
-                                                                                      60 *
-                                                                                      1000);
+            valueDeserializer.getClass().getName());
+        final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
+            config,
+            topic,
+            numMessages,
+            60 * 1000);
         Collections.sort(received);
         return received;
     }
 
-    private void verifyCorrectOutput(List<String> expectedMessages,
+    private void verifyCorrectOutput(final List<String> expectedMessages,
                                      final String topic) throws InterruptedException {
         assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic),
-                   is(expectedMessages));
+            is(expectedMessages));
     }
 
-    private void doJoin(KStream<Integer, Integer> lhs,
-                        KStream<Integer, String> rhs,
-                        String outputTopic,
-                        final String joinName) {
+    private void doJoin(final KStream<Integer, Integer> lhs,
+                        final KStream<Integer, String> rhs,
+                        final String outputTopic) {
         CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
-                 valueJoiner,
-                 getJoinWindow(),
-                 Serdes.Integer(),
-                 Serdes.Integer(),
-                 Serdes.String())
+            valueJoiner,
+            getJoinWindow(),
+            Serdes.Integer(),
+            Serdes.Integer(),
+            Serdes.String())
             .to(Serdes.Integer(), Serdes.String(), outputTopic);
     }
 


Mime
View raw message