kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: MINOR: changes embedded broker time to MockTime
Date Tue, 06 Sep 2016 22:35:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ed639e826 -> de1b853c3


http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 02e8eb7..310b584 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.  You may obtain a
  * copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -14,6 +14,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -40,10 +41,8 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockKeyValueMapper;
-import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.test.TestCondition;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -54,28 +53,30 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.TreeSet;
 
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class QueryableStateIntegrationTest {
     private static final int NUM_BROKERS = 2;
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
     private static final String STREAM_ONE = "stream-one";
     private static final String STREAM_CONCURRENT = "stream-concurrent";
     private static final String OUTPUT_TOPIC = "output";
     private static final String OUTPUT_TOPIC_CONCURRENT = "output-concurrent";
-    private static final String STREAM_THREE = "stream-three";
+    private static final String STREAM_TWO = "stream-two";
     private static final int NUM_PARTITIONS = NUM_BROKERS;
     private static final int NUM_REPLICAS = NUM_BROKERS;
     private static final long WINDOW_SIZE = 60000L;
@@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest {
     public static void createTopics() {
         CLUSTER.createTopic(STREAM_ONE);
         CLUSTER.createTopic(STREAM_CONCURRENT);
-        CLUSTER.createTopic(STREAM_THREE, NUM_PARTITIONS, NUM_REPLICAS);
+        CLUSTER.createTopic(STREAM_TWO, NUM_PARTITIONS, NUM_REPLICAS);
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_CONCURRENT);
         CLUSTER.createTopic(OUTPUT_TOPIC_THREE);
@@ -107,12 +108,9 @@ public class QueryableStateIntegrationTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
-                                 TestUtils.tempDirectory("qs-test")
-                                     .getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration
-            .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         stringComparator = new Comparator<KeyValue<String, String>>() {
 
@@ -130,23 +128,24 @@ public class QueryableStateIntegrationTest {
                 return o1.key.compareTo(o2.key);
             }
         };
-        inputValues = Arrays.asList("hello world",
-                                    "all streams lead to kafka",
-                                    "streams",
-                                    "kafka streams",
-                                    "the cat in the hat",
-                                    "green eggs and ham",
-                                    "that sam i am",
-                                    "up the creek without a paddle",
-                                    "run forest run",
-                                    "a tank full of gas",
-                                    "eat sleep rave repeat",
-                                    "one jolly sailor",
-                                    "king of the world");
+        inputValues = Arrays.asList(
+            "hello world",
+            "all streams lead to kafka",
+            "streams",
+            "kafka streams",
+            "the cat in the hat",
+            "green eggs and ham",
+            "that sam i am",
+            "up the creek without a paddle",
+            "run forest run",
+            "a tank full of gas",
+            "eat sleep rave repeat",
+            "one jolly sailor",
+            "king of the world");
         inputValuesKeys = new HashSet<>();
-        for (String sentence : inputValues) {
-            String[] words = sentence.split("\\W+");
-            for (String word : words) {
+        for (final String sentence : inputValues) {
+            final String[] words = sentence.split("\\W+");
+            for (final String word : words) {
                 inputValuesKeys.add(word);
             }
         }
@@ -163,20 +162,21 @@ public class QueryableStateIntegrationTest {
 
     /**
      * Creates a typical word count topology
+     *
      * @param inputTopic
      * @param outputTopic
      * @param streamsConfiguration config
      * @return
      */
-    private KafkaStreams createCountStream(String inputTopic, String outputTopic, Properties streamsConfiguration) {
-        KStreamBuilder builder = new KStreamBuilder();
+    private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
+        final KStreamBuilder builder = new KStreamBuilder();
         final Serde<String> stringSerde = Serdes.String();
         final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
 
         final KGroupedStream<String, String> groupedByWord = textLines
             .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                 @Override
-                public Iterable<String> apply(String value) {
+                public Iterable<String> apply(final String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
             })
@@ -194,21 +194,22 @@ public class QueryableStateIntegrationTest {
     private class StreamRunnable implements Runnable {
         private final KafkaStreams myStream;
         private boolean closed = false;
-        StreamRunnable(String inputTopic, String outputTopic, int queryPort) {
-            Properties props = (Properties) streamsConfiguration.clone();
+
+        StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort) {
+            final Properties props = (Properties) streamsConfiguration.clone();
             props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
-            this.myStream = createCountStream(inputTopic, outputTopic, props);
+            myStream = createCountStream(inputTopic, outputTopic, props);
         }
 
         @Override
         public void run() {
-            this.myStream.start();
+            myStream.start();
 
         }
 
         public void close() {
             if (!closed) {
-                this.myStream.close();
+                myStream.close();
                 closed = true;
             }
         }
@@ -237,7 +238,7 @@ public class QueryableStateIntegrationTest {
                     final ReadOnlyKeyValueStore<String, Long> store;
                     try {
                         store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-                    } catch (IllegalStateException e) {
+                    } catch (final IllegalStateException e) {
                         // Kafka Streams instance may have closed but rebalance hasn't happened
                         return false;
                     }
@@ -249,7 +250,7 @@ public class QueryableStateIntegrationTest {
 
 
     private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
-                                       final Set<String> keys, final String storeName ,
+                                       final Set<String> keys, final String storeName,
                                        final Long from, final Long to) throws Exception {
         for (final String key : keys) {
             TestUtils.waitForCondition(new TestCondition() {
@@ -264,7 +265,7 @@ public class QueryableStateIntegrationTest {
                     final ReadOnlyWindowStore<String, Long> store;
                     try {
                         store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
-                    } catch (IllegalStateException e) {
+                    } catch (final IllegalStateException e) {
                         // Kafka Streams instance may have closed but rebalance hasn't happened
                         return false;
                     }
@@ -277,18 +278,18 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void queryOnRebalance() throws Exception {
-        int numThreads = NUM_PARTITIONS;
-        StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
-        Thread[] streamThreads = new Thread[numThreads];
+        final int numThreads = NUM_PARTITIONS;
+        final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
+        final Thread[] streamThreads = new Thread[numThreads];
         final int numIterations = 500000;
 
         // create concurrent producer
-        ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_THREE, inputValues, numIterations);
-        Thread producerThread = new Thread(producerRunnable);
+        final ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_TWO, inputValues, numIterations);
+        final Thread producerThread = new Thread(producerRunnable);
 
         // create three stream threads
         for (int i = 0; i < numThreads; i++) {
-            streamRunnables[i] = new StreamRunnable(STREAM_THREE, OUTPUT_TOPIC_THREE, i);
+            streamRunnables[i] = new StreamRunnable(STREAM_TWO, OUTPUT_TOPIC_THREE, i);
             streamThreads[i] = new Thread(streamRunnables[i]);
             streamThreads[i].start();
         }
@@ -299,9 +300,9 @@ public class QueryableStateIntegrationTest {
 
             for (int i = 0; i < numThreads; i++) {
                 verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys,
-                    "word-count-store-" + STREAM_THREE);
+                    "word-count-store-" + STREAM_TWO);
                 verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys,
-                    "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE);
+                    "windowed-word-count-store-" + STREAM_TWO, 0L, WINDOW_SIZE);
             }
 
             // kill N-1 threads
@@ -313,9 +314,9 @@ public class QueryableStateIntegrationTest {
 
             // query from the remaining thread
             verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys,
-                "word-count-store-" + STREAM_THREE);
+                "word-count-store-" + STREAM_TWO);
             verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys,
-                "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE);
+                "windowed-word-count-store-" + STREAM_TWO, 0L, WINDOW_SIZE);
         } finally {
             for (int i = 0; i < numThreads; i++) {
                 if (!streamRunnables[i].isClosed()) {
@@ -335,8 +336,8 @@ public class QueryableStateIntegrationTest {
 
         final int numIterations = 500000;
 
-        ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations);
-        Thread producerThread = new Thread(producerRunnable);
+        final ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations);
+        final Thread producerThread = new Thread(producerRunnable);
         kafkaStreams = createCountStream(STREAM_CONCURRENT, OUTPUT_TOPIC_CONCURRENT, streamsConfiguration);
         kafkaStreams.start();
         producerThread.start();
@@ -351,8 +352,8 @@ public class QueryableStateIntegrationTest {
                 kafkaStreams.store("windowed-word-count-store-" + STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>windowStore());
 
 
-            Map<String, Long> expectedWindowState = new HashMap<>();
-            Map<String, Long> expectedCount = new HashMap<>();
+            final Map<String, Long> expectedWindowState = new HashMap<>();
+            final Map<String, Long> expectedCount = new HashMap<>();
             while (producerRunnable.getCurrIteration() < numIterations) {
                 verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
                     expectedCount, windowStore, keyValueStore, false);
@@ -369,7 +370,7 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryState() throws Exception {
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
 
         final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator);
@@ -382,7 +383,7 @@ public class QueryableStateIntegrationTest {
 
 
         final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
-        for (String key : keys) {
+        for (final String key : keys) {
             expectedCount.add(new KeyValue<>(key, 1L));
         }
 
@@ -393,7 +394,8 @@ public class QueryableStateIntegrationTest {
                 CLUSTER.bootstrapServers(),
                 StringSerializer.class,
                 StringSerializer.class,
-                new Properties()));
+                new Properties()),
+            mockTime);
 
         final KStream<String, String> s1 = builder.stream(STREAM_ONE);
 
@@ -410,12 +412,12 @@ public class QueryableStateIntegrationTest {
             myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
 
         final ReadOnlyWindowStore<String, Long> windowStore =
-                kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
+            kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
         verifyCanGetByKey(keys,
-                          expectedCount,
-                          expectedCount,
-                          windowStore,
-                          myCount);
+            expectedCount,
+            expectedCount,
+            windowStore,
+            myCount);
 
         verifyRangeAndAll(expectedCount, myCount);
 
@@ -463,10 +465,10 @@ public class QueryableStateIntegrationTest {
 
         final long timeout = System.currentTimeMillis() + 30000;
         while (windowState.size() < 5 &&
-               countState.size() < 5 &&
-               System.currentTimeMillis() < timeout) {
+            countState.size() < 5 &&
+            System.currentTimeMillis() < timeout) {
             Thread.sleep(10);
-            for (String key : keys) {
+            for (final String key : keys) {
                 windowState.addAll(fetch(windowStore, key));
                 final Long value = myCount.get(key);
                 if (value != null) {
@@ -481,28 +483,29 @@ public class QueryableStateIntegrationTest {
     /**
      * Verify that the new count is greater than or equal to the previous count.
      * Note: this method changes the values in expectedWindowState and expectedCount
-     * @param keys All the keys we ever expect to find
+     *
+     * @param keys                  All the keys we ever expect to find
      * @param expectedWindowedCount Expected windowed count
-     * @param expectedCount Expected count
-     * @param windowStore Window Store
-     * @param keyValueStore Key-value store
-     * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false,
-     *                          the method merely inserts the new found key into the list of
-     *                          expected keys.
+     * @param expectedCount         Expected count
+     * @param windowStore           Window Store
+     * @param keyValueStore         Key-value store
+     * @param failIfKeyNotFound     if true, tests fails if an expected key is not found in store. If false,
+     *                              the method merely inserts the new found key into the list of
+     *                              expected keys.
      * @throws InterruptedException
      */
     private void verifyGreaterOrEqual(final String[] keys,
-                                      Map<String, Long> expectedWindowedCount,
-                                      Map<String, Long> expectedCount,
+                                      final Map<String, Long> expectedWindowedCount,
+                                      final Map<String, Long> expectedCount,
                                       final ReadOnlyWindowStore<String, Long> windowStore,
                                       final ReadOnlyKeyValueStore<String, Long> keyValueStore,
-                                      boolean failIfKeyNotFound)
+                                      final boolean failIfKeyNotFound)
         throws InterruptedException {
         final Map<String, Long> windowState = new HashMap<>();
         final Map<String, Long> countState = new HashMap<>();
 
-        for (String key : keys) {
-            Map<String, Long> map = fetchMap(windowStore, key);
+        for (final String key : keys) {
+            final Map<String, Long> map = fetchMap(windowStore, key);
             if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) {
                 fail("Key not found " + key);
             }
@@ -517,9 +520,9 @@ public class QueryableStateIntegrationTest {
             }
         }
 
-        for (Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) {
+        for (final Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) {
             if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) {
-                Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey());
+                final Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey());
                 assertTrue(actualWindowStateEntry.getValue() >= expectedValue);
             } else {
                 if (failIfKeyNotFound) {
@@ -530,9 +533,9 @@ public class QueryableStateIntegrationTest {
             expectedWindowedCount.put(actualWindowStateEntry.getKey(), actualWindowStateEntry.getValue());
         }
 
-        for (Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) {
+        for (final Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) {
             if (expectedCount.containsKey(actualCountStateEntry.getKey())) {
-                Long expectedValue = expectedCount.get(actualCountStateEntry.getKey());
+                final Long expectedValue = expectedCount.get(actualCountStateEntry.getKey());
                 assertTrue(actualCountStateEntry.getValue() >= expectedValue);
             } else {
                 if (failIfKeyNotFound) {
@@ -545,28 +548,28 @@ public class QueryableStateIntegrationTest {
 
     }
 
-    private void waitUntilAtLeastNumRecordProcessed(String topic, int numRecs) throws InterruptedException {
+    private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
         final Properties config = new Properties();
         config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
         config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                           StringDeserializer.class.getName());
+            StringDeserializer.class.getName());
         config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                           LongDeserializer.class.getName());
-        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
-                                                               topic,
-                                                               numRecs,
-                                                               60 *
-                                                               1000);
+            LongDeserializer.class.getName());
+        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
+            config,
+            topic,
+            numRecs,
+            60 * 1000);
     }
 
     private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
-                                                final String key) {
+                                              final String key) {
 
         final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
         if (fetch.hasNext()) {
-            KeyValue<Long, Long> next = fetch.next();
+            final KeyValue<Long, Long> next = fetch.next();
             return Collections.singleton(KeyValue.pair(key, next.value));
         }
         return Collections.emptySet();
@@ -577,7 +580,7 @@ public class QueryableStateIntegrationTest {
 
         final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
         if (fetch.hasNext()) {
-            KeyValue<Long, Long> next = fetch.next();
+            final KeyValue<Long, Long> next = fetch.next();
             return Collections.singletonMap(key, next.value);
         }
         return Collections.emptyMap();
@@ -588,13 +591,13 @@ public class QueryableStateIntegrationTest {
      * A class that periodically produces records in a separate thread
      */
     private class ProducerRunnable implements Runnable {
-        private String topic;
+        private final String topic;
         private final List<String> inputValues;
         private final int numIterations;
         private int currIteration = 0;
         boolean shutdown = false;
 
-        ProducerRunnable(String topic, List<String> inputValues, int numIterations) {
+        ProducerRunnable(final String topic, final List<String> inputValues, final int numIterations) {
             this.topic = topic;
             this.inputValues = inputValues;
             this.numIterations = numIterations;
@@ -603,16 +606,18 @@ public class QueryableStateIntegrationTest {
         private synchronized void incrementInteration() {
             currIteration++;
         }
+
         public synchronized int getCurrIteration() {
             return currIteration;
         }
+
         public synchronized void shutdown() {
             shutdown = true;
         }
 
         @Override
         public void run() {
-            Properties producerConfig = new Properties();
+            final Properties producerConfig = new Properties();
             producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
             producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
             producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index dd43af6..fe0a0eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -16,6 +16,7 @@
 
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
@@ -38,8 +39,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -65,11 +66,11 @@ import static org.junit.Assert.fail;
  * End-to-end integration test based on using regex and named topics for creating sources, using
  * an embedded Kafka cluster.
  */
-
 public class RegexSourceIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
 
     private static final String TOPIC_1 = "topic-1";
     private static final String TOPIC_2 = "topic-2";
@@ -103,8 +104,8 @@ public class RegexSourceIntegrationTest {
     public void setUp() {
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(),
-                                                                 STRING_SERDE_CLASSNAME,
-                                                                 STRING_SERDE_CLASSNAME);
+            STRING_SERDE_CLASSNAME,
+            STRING_SERDE_CLASSNAME);
     }
 
     @After
@@ -120,28 +121,28 @@ public class RegexSourceIntegrationTest {
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-        StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+        final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
         CLUSTER.createTopic("TEST-TOPIC-1");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
 
-        Field streamThreadsField = streams.getClass().getDeclaredField("threads");
+        final Field streamThreadsField = streams.getClass().getDeclaredField("threads");
         streamThreadsField.setAccessible(true);
-        StreamThread[] streamThreads =  (StreamThread[]) streamThreadsField.get(streams);
-        StreamThread originalThread = streamThreads[0];
+        final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams);
+        final StreamThread originalThread = streamThreads[0];
 
         final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
-                                           new DefaultKafkaClientSupplier(),
-                                           originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+            new DefaultKafkaClientSupplier(),
+            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
 
-        TestCondition oneTopicAdded  = new TestCondition() {
+        final TestCondition oneTopicAdded = new TestCondition() {
             @Override
             public boolean conditionMet() {
                 return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
@@ -155,7 +156,7 @@ public class RegexSourceIntegrationTest {
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestCondition secondTopicAdded  = new TestCondition() {
+        final TestCondition secondTopicAdded = new TestCondition() {
             @Override
             public boolean conditionMet() {
                 return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
@@ -174,31 +175,31 @@ public class RegexSourceIntegrationTest {
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
 
-        StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+        final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
         CLUSTER.createTopic("TEST-TOPIC-A");
         CLUSTER.createTopic("TEST-TOPIC-B");
 
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
+        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
 
-        Field streamThreadsField = streams.getClass().getDeclaredField("threads");
+        final Field streamThreadsField = streams.getClass().getDeclaredField("threads");
         streamThreadsField.setAccessible(true);
-        StreamThread[] streamThreads =  (StreamThread[]) streamThreadsField.get(streams);
-        StreamThread originalThread = streamThreads[0];
+        final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams);
+        final StreamThread originalThread = streamThreads[0];
 
         final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
-                new DefaultKafkaClientSupplier(),
-                originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+            new DefaultKafkaClientSupplier(),
+            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
 
         streamThreads[0] = testStreamThread;
 
-        TestCondition bothTopicsAdded  = new TestCondition() {
+        final TestCondition bothTopicsAdded = new TestCondition() {
             @Override
             public boolean conditionMet() {
                 return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
@@ -210,7 +211,7 @@ public class RegexSourceIntegrationTest {
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-        TestCondition oneTopicRemoved  = new TestCondition() {
+        final TestCondition oneTopicRemoved = new TestCondition() {
             @Override
             public boolean conditionMet() {
                 return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
@@ -226,45 +227,45 @@ public class RegexSourceIntegrationTest {
     @Test
     public void testShouldReadFromRegexAndNamedTopics() throws Exception {
 
-        String topic1TestMessage = "topic-1 test";
-        String topic2TestMessage = "topic-2 test";
-        String topicATestMessage = "topic-A test";
-        String topicCTestMessage = "topic-C test";
-        String topicYTestMessage = "topic-Y test";
-        String topicZTestMessage = "topic-Z test";
+        final String topic1TestMessage = "topic-1 test";
+        final String topic2TestMessage = "topic-2 test";
+        final String topicATestMessage = "topic-A test";
+        final String topicCTestMessage = "topic-C test";
+        final String topicYTestMessage = "topic-Y test";
+        final String topicZTestMessage = "topic-Z test";
 
 
         final Serde<String> stringSerde = Serdes.String();
 
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
-        KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
-        KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
+        final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
+        final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
 
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
 
-        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+        final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
 
-        List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
-        List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
-        List<String> actualValues = new ArrayList<>(6);
+        final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
+        final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
+        final List<String> actualValues = new ArrayList<>(6);
 
-        for (KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
+        for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
             actualValues.add(receivedKeyValue.value);
         }
 
@@ -278,34 +279,34 @@ public class RegexSourceIntegrationTest {
     @Test(expected = AssertionError.class)
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
 
-        String fooMessage = "fooMessage";
-        String fMessage = "fMessage";
+        final String fooMessage = "fooMessage";
+        final String fMessage = "fMessage";
 
 
         final Serde<String> stringSerde = Serdes.String();
 
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
 
         // overlapping patterns here, no messages should be sent as TopologyBuilderException
         // will be thrown when the processor topology is built.
 
-        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
-        KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
+        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
+        final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
 
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
 
-        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig);
-        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
 
-        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+        final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
 
         try {
             IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
@@ -319,14 +320,14 @@ public class RegexSourceIntegrationTest {
     private class TestStreamThread extends StreamThread {
         public volatile List<String> assignedTopicPartitions = new ArrayList<>();
 
-        public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
+        public TestStreamThread(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
             super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
         }
 
         @Override
-        public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
-            List<String> topicPartitions = new ArrayList<>();
-            for (TopicPartition partition : partitions) {
+        public StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+            final List<String> topicPartitions = new ArrayList<>();
+            for (final TopicPartition partition : partitions) {
                 topicPartitions.add(partition.topic());
             }
             Collections.sort(topicPartitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 8e9101d..9c0cbe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.integration.utils;
 
 import kafka.server.KafkaConfig$;
+import kafka.utils.MockTime;
 import kafka.zk.EmbeddedZookeeper;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
@@ -36,15 +37,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     private EmbeddedZookeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
 
-    public EmbeddedKafkaCluster(int numBrokers) {
-        this.brokers = new KafkaEmbedded[numBrokers];
+    public EmbeddedKafkaCluster(final int numBrokers) {
+        brokers = new KafkaEmbedded[numBrokers];
     }
 
+    public MockTime time = new MockTime();
+
     /**
      * Creates and starts a Kafka cluster.
      */
     public void start() throws IOException, InterruptedException {
-        Properties brokerConfig = new Properties();
+        final Properties brokerConfig = new Properties();
 
         log.debug("Initiating embedded Kafka cluster startup");
         log.debug("Starting a ZooKeeper instance");
@@ -56,10 +59,10 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
         brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
 
-        for (int i = 0; i < this.brokers.length; i++) {
+        for (int i = 0; i < brokers.length; i++) {
             brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
             log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
-            brokers[i] = new KafkaEmbedded(brokerConfig);
+            brokers[i] = new KafkaEmbedded(brokerConfig, time);
 
             log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
                 brokers[i].brokerList(), brokers[i].zookeeperConnect());
@@ -70,8 +73,8 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * Stop the Kafka cluster.
      */
     public void stop() {
-        for (int i = 0; i < this.brokers.length; i++) {
-            brokers[i].stop();
+        for (final KafkaEmbedded broker : brokers) {
+            broker.stop();
         }
         zookeeper.shutdown();
     }
@@ -79,7 +82,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     /**
      * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
      * Example: `127.0.0.1:2181`.
-     *
+     * <p>
      * You can use this to e.g. tell Kafka brokers how to connect to this instance.
      */
     public String zKConnectString() {
@@ -88,7 +91,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
     /**
      * This cluster's `bootstrap.servers` value.  Example: `127.0.0.1:9092`.
-     *
+     * <p>
      * You can use this to tell Kafka producers how to connect to this cluster.
      */
     public String bootstrapServers() {
@@ -108,7 +111,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      *
      * @param topic The name of the topic.
      */
-    public void createTopic(String topic) {
+    public void createTopic(final String topic) {
         createTopic(topic, 1, 1, new Properties());
     }
 
@@ -119,7 +122,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param partitions  The number of partitions for this topic.
      * @param replication The replication factor for (the partitions of) this topic.
      */
-    public void createTopic(String topic, int partitions, int replication) {
+    public void createTopic(final String topic, final int partitions, final int replication) {
         createTopic(topic, partitions, replication, new Properties());
     }
 
@@ -131,14 +134,14 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param replication The replication factor for (partitions of) this topic.
      * @param topicConfig Additional topic-level configuration settings.
      */
-    public void createTopic(String topic,
-                            int partitions,
-                            int replication,
-                            Properties topicConfig) {
+    public void createTopic(final String topic,
+                            final int partitions,
+                            final int replication,
+                            final Properties topicConfig) {
         brokers[0].createTopic(topic, partitions, replication, topicConfig);
     }
 
-    public void deleteTopic(String topic) {
+    public void deleteTopic(final String topic) {
         brokers[0].deleteTopic(topic);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1a1a561..117e6ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.integration.utils;
 
+import kafka.utils.Time;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -57,10 +58,10 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the consumer.
      * @return The values retrieved via the consumer.
      */
-    public static <V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
-        List<V> returnList = new ArrayList<>();
-        List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
-        for (KeyValue<?, V> kv : kvs) {
+    public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final int maxMessages) {
+        final List<V> returnList = new ArrayList<>();
+        final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+        for (final KeyValue<?, V> kv : kvs) {
             returnList.add(kv.value);
         }
         return returnList;
@@ -74,7 +75,7 @@ public class IntegrationTestUtils {
      * @param consumerConfig Kafka consumer configuration
      * @return The KeyValue elements retrieved via the consumer.
      */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) {
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig) {
         return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
     }
 
@@ -87,17 +88,17 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the consumer
      * @return The KeyValue elements retrieved via the consumer
      */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
-        KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) {
+        final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
         consumer.subscribe(Collections.singletonList(topic));
-        int pollIntervalMs = 100;
-        int maxTotalPollTimeMs = 2000;
+        final int pollIntervalMs = 100;
+        final int maxTotalPollTimeMs = 2000;
         int totalPollTimeMs = 0;
-        List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+        final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
         while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
-            for (ConsumerRecord<K, V> record : records) {
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+            for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
             }
         }
@@ -105,7 +106,7 @@ public class IntegrationTestUtils {
         return consumedValues;
     }
 
-    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
+    private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
         return maxMessages <= 0 || messagesConsumed < maxMessages;
     }
 
@@ -114,11 +115,11 @@ public class IntegrationTestUtils {
      *
      * @param streamsConfiguration Streams configuration settings
      */
-    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
+    public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException {
         final String tmpDir = TestUtils.IO_TMP_DIR.getPath();
-        String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
         if (path != null) {
-            File node = Paths.get(path).normalize().toFile();
+            final File node = Paths.get(path).normalize().toFile();
             // Only purge state when it's under java.io.tmpdir.  This is a safety net to prevent accidentally
             // deleting important local directory trees.
             if (node.getAbsolutePath().startsWith(tmpDir)) {
@@ -135,22 +136,25 @@ public class IntegrationTestUtils {
      * @param <V>            Value type of the data records
      */
     public static <K, V> void produceKeyValuesSynchronously(
-        String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
+        final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time)
         throws ExecutionException, InterruptedException {
-        produceKeyValuesSynchronouslyWithTimestamp(topic,
-                                                   records,
-                                                   producerConfig,
-                                                   null);
+        for (final KeyValue<K, V> record : records) {
+            produceKeyValuesSynchronouslyWithTimestamp(topic,
+                Collections.singleton(record),
+                producerConfig,
+                time.milliseconds());
+            time.sleep(1L);
+        }
     }
 
-    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic,
-                                                                         Collection<KeyValue<K, V>> records,
-                                                                         Properties producerConfig,
-                                                                         Long timestamp)
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                         final Collection<KeyValue<K, V>> records,
+                                                                         final Properties producerConfig,
+                                                                         final Long timestamp)
         throws ExecutionException, InterruptedException {
-        Producer<K, V> producer = new KafkaProducer<>(producerConfig);
-        for (KeyValue<K, V> record : records) {
-            Future<RecordMetadata> f = producer.send(
+        final Producer<K, V> producer = new KafkaProducer<>(producerConfig);
+        for (final KeyValue<K, V> record : records) {
+            final Future<RecordMetadata> f = producer.send(
                 new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
             f.get();
         }
@@ -159,92 +163,94 @@ public class IntegrationTestUtils {
     }
 
     public static <V> void produceValuesSynchronously(
-        String topic, Collection<V> records, Properties producerConfig)
+        final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
         throws ExecutionException, InterruptedException {
-        Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
-        for (V value : records) {
-            KeyValue<Object, V> kv = new KeyValue<>(null, value);
+        final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
+        for (final V value : records) {
+            final KeyValue<Object, V> kv = new KeyValue<>(null, value);
             keyedRecords.add(kv);
         }
-        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
+        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time);
     }
 
-    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
-                                                                                  String topic,
-                                                                                  int expectedNumRecords) throws InterruptedException {
+    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                                  final String topic,
+                                                                                  final int expectedNumRecords) throws InterruptedException {
 
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
     }
 
     /**
      * Wait until enough data (key-value records) has been consumed.
-     * @param consumerConfig Kafka Consumer configuration
-     * @param topic          Topic to consume from
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
      * @param expectedNumRecords Minimum number of expected records
-     * @param waitTime       Upper bound in waiting time in milliseconds
+     * @param waitTime           Upper bound in waiting time in milliseconds
      * @return All the records consumed, or null if no records are consumed
      * @throws InterruptedException
-     * @throws AssertionError if the given wait time elapses
+     * @throws AssertionError       if the given wait time elapses
      */
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                   final String topic,
                                                                                   final int expectedNumRecords,
-                                                                                  long waitTime) throws InterruptedException {
+                                                                                  final long waitTime) throws InterruptedException {
         final List<KeyValue<K, V>> accumData = new ArrayList<>();
 
-        TestCondition valuesRead = new TestCondition() {
+        final TestCondition valuesRead = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
+                final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
                 accumData.addAll(readData);
                 return accumData.size() >= expectedNumRecords;
             }
         };
 
-        String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
+        final String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
 
         TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
 
         return accumData;
     }
 
-    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
-                                                                String topic,
-                                                                int expectedNumRecords) throws InterruptedException {
+    public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
+                                                                final String topic,
+                                                                final int expectedNumRecords) throws InterruptedException {
 
         return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
     }
 
     /**
      * Wait until enough data (value records) has been consumed.
-     * @param consumerConfig Kafka Consumer configuration
-     * @param topic          Topic to consume from
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
      * @param expectedNumRecords Minimum number of expected records
-     * @param waitTime       Upper bound in waiting time in milliseconds
+     * @param waitTime           Upper bound in waiting time in milliseconds
      * @return All the records consumed, or null if no records are consumed
      * @throws InterruptedException
-     * @throws AssertionError if the given wait time elapses
+     * @throws AssertionError       if the given wait time elapses
      */
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
                                                                 final String topic,
                                                                 final int expectedNumRecords,
-                                                                long waitTime) throws InterruptedException {
+                                                                final long waitTime) throws InterruptedException {
         final List<V> accumData = new ArrayList<>();
 
-        TestCondition valuesRead = new TestCondition() {
+        final TestCondition valuesRead = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
+                final List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
                 accumData.addAll(readData);
                 return accumData.size() >= expectedNumRecords;
             }
         };
 
-        String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
+        final String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
 
         TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
 
         return accumData;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 43b82d6..ac9b670 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -17,36 +17,33 @@
 
 package org.apache.kafka.streams.integration.utils;
 
-
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.CoreUtils;
-import kafka.utils.SystemTime$;
+import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 /**
  * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
  * default.
- *
+ * <p>
  * Requires a running ZooKeeper instance to connect to.
  */
 public class KafkaEmbedded {
@@ -56,6 +53,7 @@ public class KafkaEmbedded {
     private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+
     private final Properties effectiveConfig;
     private final File logDir;
     public final TemporaryFolder tmpFolder;
@@ -63,20 +61,21 @@ public class KafkaEmbedded {
 
     /**
      * Creates and starts an embedded Kafka broker.
+     *
      * @param config Broker configuration settings.  Used to modify, for example, on which port the
      *               broker should listen to.  Note that you cannot change the `log.dirs` setting
      *               currently.
      */
-    public KafkaEmbedded(Properties config) throws IOException {
+    public KafkaEmbedded(final Properties config, final MockTime time) throws IOException {
         tmpFolder = new TemporaryFolder();
         tmpFolder.create();
         logDir = tmpFolder.newFolder();
         effectiveConfig = effectiveConfigFrom(config);
-        boolean loggingEnabled = true;
-        KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
+        final boolean loggingEnabled = true;
+        final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
         log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
             logDir, zookeeperConnect());
-        kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$);
+        kafka = TestUtils.createServer(kafkaConfig, time);
         log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
             brokerList(), zookeeperConnect());
     }
@@ -85,12 +84,13 @@ public class KafkaEmbedded {
     /**
      * Creates the configuration for starting the Kafka broker by merging default values with
      * overwrites.
+     *
      * @param initialConfig Broker configuration settings that override the default config.
      * @return
      * @throws IOException
      */
-    private Properties effectiveConfigFrom(Properties initialConfig) throws IOException {
-        Properties effectiveConfig = new Properties();
+    private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException {
+        final Properties effectiveConfig = new Properties();
         effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
         effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
         effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
@@ -106,7 +106,7 @@ public class KafkaEmbedded {
 
     /**
      * This broker's `metadata.broker.list` value.  Example: `127.0.0.1:9092`.
-     *
+     * <p>
      * You can use this to tell Kafka producers and consumers how to connect to this instance.
      */
     public String brokerList() {
@@ -130,7 +130,7 @@ public class KafkaEmbedded {
         kafka.shutdown();
         kafka.awaitShutdown();
         log.debug("Removing logs.dir at {} ...", logDir);
-        List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
+        final List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
         CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
         tmpFolder.delete();
         log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
@@ -142,7 +142,7 @@ public class KafkaEmbedded {
      *
      * @param topic The name of the topic.
      */
-    public void createTopic(String topic) {
+    public void createTopic(final String topic) {
         createTopic(topic, 1, 1, new Properties());
     }
 
@@ -153,7 +153,7 @@ public class KafkaEmbedded {
      * @param partitions  The number of partitions for this topic.
      * @param replication The replication factor for (the partitions of) this topic.
      */
-    public void createTopic(String topic, int partitions, int replication) {
+    public void createTopic(final String topic, final int partitions, final int replication) {
         createTopic(topic, partitions, replication, new Properties());
     }
 
@@ -165,10 +165,10 @@ public class KafkaEmbedded {
      * @param replication The replication factor for (partitions of) this topic.
      * @param topicConfig Additional topic-level configuration settings.
      */
-    public void createTopic(String topic,
-                            int partitions,
-                            int replication,
-                            Properties topicConfig) {
+    public void createTopic(final String topic,
+                            final int partitions,
+                            final int replication,
+                            final Properties topicConfig) {
         log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
             topic, partitions, replication, topicConfig);
 
@@ -176,29 +176,29 @@ public class KafkaEmbedded {
         // createTopic() will only seem to work (it will return without error).  The topic will exist in
         // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
         // topic.
-        ZkClient zkClient = new ZkClient(
+        final ZkClient zkClient = new ZkClient(
             zookeeperConnect(),
             DEFAULT_ZK_SESSION_TIMEOUT_MS,
             DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
             ZKStringSerializer$.MODULE$);
-        boolean isSecure = false;
-        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+        final boolean isSecure = false;
+        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
         AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
         zkClient.close();
     }
 
-    public void deleteTopic(String topic) {
+    public void deleteTopic(final String topic) {
         log.debug("Deleting topic { name: {} }", topic);
 
-        ZkClient zkClient = new ZkClient(
-                zookeeperConnect(),
-                DEFAULT_ZK_SESSION_TIMEOUT_MS,
-                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-                ZKStringSerializer$.MODULE$);
-        boolean isSecure = false;
-        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+        final ZkClient zkClient = new ZkClient(
+            zookeeperConnect(),
+            DEFAULT_ZK_SESSION_TIMEOUT_MS,
+            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+            ZKStringSerializer$.MODULE$);
+        final boolean isSecure = false;
+        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
         AdminUtils.deleteTopic(zkUtils, topic);
         zkClient.close();
     }
 
-}
\ No newline at end of file
+}


Mime
View raw message