kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3842: consolidate utility methods to TestUtils
Date Fri, 24 Jun 2016 20:42:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ef42c224a -> 88924b03d


KAFKA-3842: consolidate utility methods to TestUtils

…stUtils, added method for pausing tests to TestUtils

Changes made:
 1. Added utility method for creating consumer configs.
 2. Added methods for creating producer, consumer configs with default values for de/serializers.
 3. Pulled out method for waiting for test state to TestUtils (not using Thread.sleep).
 4. Added utility class for creating streams configs and methods providing default de/serializers.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1532 from bbejeck/KAFKA_3842_add_helper_functions_test_utils


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/88924b03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/88924b03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/88924b03

Branch: refs/heads/trunk
Commit: 88924b03d511132212ee5d8dec02063deb8313f1
Parents: ef42c22
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Fri Jun 24 13:42:53 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jun 24 13:42:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/test/TestCondition.java    | 26 ++++++
 .../java/org/apache/kafka/test/TestUtils.java   | 65 ++++++++++++++
 .../InternalTopicIntegrationTest.java           |  4 +-
 .../integration/JoinIntegrationTest.java        |  4 +-
 .../KGroupedStreamIntegrationTest.java          |  3 +-
 .../integration/KStreamRepartitionJoinTest.java |  3 +-
 .../integration/RegexSourceIntegrationTest.java | 92 ++++++++------------
 .../integration/utils/IntegrationTestUtils.java | 69 +++++++++------
 .../internals/ProcessorTopologyTest.java        |  4 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  3 +-
 .../kafka/streams/state/StateTestUtils.java     | 79 -----------------
 .../org/apache/kafka/test/StreamsTestUtils.java | 56 ++++++++++++
 12 files changed, 236 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/clients/src/test/java/org/apache/kafka/test/TestCondition.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestCondition.java b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
new file mode 100644
index 0000000..f78c91b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+
+package org.apache.kafka.test;
+
+/**
+ * Interface to wrap actions that are required to wait until a condition is met
+ * for testing purposes.  Note that this is not intended to do any assertions.
+ */
+public interface TestCondition {
+
+    boolean conditionMet();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index a818d53..372954a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -29,7 +29,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.UUID;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -126,6 +128,18 @@ public class TestUtils {
     }
 
     /**
+     * Create a temporary directory named "test" under /temp
+     * @return  the temporary directory just created.
+     */
+    public static File tempDir() {
+        try {
+            return tempDirectory(new File("/tmp").toPath(), "test");
+        } catch (IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+    }
+
+    /**
      * Create a temporary relative directory in the specified parent directory with the given
prefix.
      *
      * @param parent The parent folder path name, if null using the default temporary-file
directory
@@ -178,4 +192,55 @@ public class TestUtils {
         properties.putAll(additional);
         return properties;
     }
+
+    public static Properties producerConfig(final String bootstrapServers, Class keySerializer,
Class valueSerializer) {
+        return producerConfig(bootstrapServers, keySerializer, valueSerializer, new Properties());
+    }
+
+    public static Properties consumerConfig(final String bootstrapServers,
+                                            final String groupId,
+                                            final Class keyDeserializer,
+                                            final Class valueDeserializer,
+                                            final Properties additional) {
+
+        final Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+        consumerConfig.putAll(additional);
+        return consumerConfig;
+    }
+
+    /**
+     * returns consumer config with random UUID for the Group ID
+     */
+    public static Properties consumerConfig(final String bootstrapServers, Class keyDeserializer,
Class valueDeserializer) {
+        return consumerConfig(bootstrapServers,
+                              UUID.randomUUID().toString(),
+                              keyDeserializer,
+                              valueDeserializer,
+                              new Properties());
+    }
+
+    /**
+     *  uses default value of 30 seconds for timeout
+     */
+    public static void waitForCondition(TestCondition testCondition) throws InterruptedException
{
+        waitForCondition(testCondition, 30000);
+    }
+
+    /**
+     *  Used to wait for specific conditions/state to be me during a test
+     *  this is meant to be a replacement for using Thread.sleep
+     */
+    public static void waitForCondition(TestCondition testCondition, long maxTimeMillis)
throws InterruptedException {
+        long startTime = System.currentTimeMillis();
+
+        while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime)
< maxTimeMillis)) {
+            Thread.sleep(Math.min(maxTimeMillis, 100L));
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index addebae..08406d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.StateTestUtils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -123,7 +123,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index f251a85..bf01cbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.StateTestUtils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -149,7 +149,7 @@ public class JoinIntegrationTest {
         // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
         // accordingly.
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
-                                 StateTestUtils.tempDir().getPath());
+                                 TestUtils.tempDir().getPath());
 
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
index 1ec6573..b381251 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.StateTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -81,7 +80,7 @@ public class KGroupedStreamIntegrationTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
 
         KeyValueMapper<Integer, String, String>
             mapper =

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index c852513..7dabc33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.StateTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -81,7 +80,7 @@ public class KStreamRepartitionJoinTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, StateTestUtils.tempDir().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index cf48391..02f971e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -16,8 +16,6 @@
 
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
@@ -39,6 +37,9 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -59,6 +60,7 @@ import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * End-to-end integration test based on using regex and named topics for creating sources,
using
@@ -82,6 +84,7 @@ public class RegexSourceIntegrationTest {
     private static final int SECOND_UPDATE = 1;
 
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
 
 
@@ -100,7 +103,10 @@ public class RegexSourceIntegrationTest {
 
     @Before
     public void setUp() {
-        streamsConfiguration = getStreamsConfig();
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(),
+                                                                 STRING_SERDE_CLASSNAME,
+                                                                 STRING_SERDE_CLASSNAME);
     }
 
     @After
@@ -135,13 +141,16 @@ public class RegexSourceIntegrationTest {
                                            new DefaultKafkaClientSupplier(),
                                            originalThread.applicationId, originalThread.clientId,
originalThread.processId, new Metrics(), new SystemTime());
 
+        TestCondition tasksUpdated = createTasksUpdatedCondition(testStreamThread);
+
         streamThreads[0] = testStreamThread;
         streams.start();
-        testStreamThread.waitUntilTasksUpdated();
+
+        TestUtils.waitForCondition(tasksUpdated);
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
-        testStreamThread.waitUntilTasksUpdated();
+        TestUtils.waitForCondition(tasksUpdated);
 
         streams.close();
 
@@ -180,13 +189,15 @@ public class RegexSourceIntegrationTest {
                 originalThread.applicationId, originalThread.clientId, originalThread.processId,
new Metrics(), new SystemTime());
 
         streamThreads[0] = testStreamThread;
+        TestCondition tasksUpdated = createTasksUpdatedCondition(testStreamThread);
+
         streams.start();
 
-        testStreamThread.waitUntilTasksUpdated();
+        TestUtils.waitForCondition(tasksUpdated);
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-        testStreamThread.waitUntilTasksUpdated();
+        TestUtils.waitForCondition(tasksUpdated);
 
         streams.close();
 
@@ -224,7 +235,7 @@ public class RegexSourceIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        Properties producerConfig = getProducerConfig();
+        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class);
 
         IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage),
producerConfig);
         IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage),
producerConfig);
@@ -233,7 +244,7 @@ public class RegexSourceIntegrationTest {
         IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage),
producerConfig);
         IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage),
producerConfig);
 
-        Properties consumerConfig = getConsumerConfig();
+        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class);
 
         List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage,
topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
         List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
DEFAULT_OUTPUT_TOPIC, 6);
@@ -272,61 +283,25 @@ public class RegexSourceIntegrationTest {
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-
-        // Remove any state from previous test runs
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        Properties producerConfig = getProducerConfig();
+        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class);
 
         IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage),
producerConfig);
         IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage),
producerConfig);
 
-        Properties consumerConfig = getConsumerConfig();
+        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class);
 
         try {
             IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC,
2, 5000);
+            fail("Should not get here");
         } finally {
             streams.close();
         }
 
     }
 
-    private Properties getProducerConfig() {
-        Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        return producerConfig;
-    }
-
-    private Properties getStreamsConfig() {
-        Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex-source-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        return streamsConfiguration;
-    }
-
-    private Properties getConsumerConfig() {
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-source-integration-consumer");
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        return consumerConfig;
-    }
-
     private class TestStreamThread extends StreamThread {
 
         public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
@@ -349,16 +324,21 @@ public class RegexSourceIntegrationTest {
             return super.createStreamTask(id, partitions);
         }
 
+    }
 
-        void waitUntilTasksUpdated() {
-            long maxTimeMillis = 30000;
-            long startTime = System.currentTimeMillis();
-            while (!streamTaskUpdated && ((System.currentTimeMillis() - startTime)
< maxTimeMillis)) {
-               //empty loop just waiting for update
-            }
-            streamTaskUpdated = false;
-        }
 
+    private TestCondition createTasksUpdatedCondition(final TestStreamThread testStreamThread)
{
+        return new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                if (testStreamThread.streamTaskUpdated) {
+                    testStreamThread.streamTaskUpdated = false;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        };
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 83b431c..9d881e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -183,23 +185,30 @@ public class IntegrationTestUtils {
      * @throws InterruptedException
      * @throws AssertionError if the given wait time elapses
      */
-    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties
consumerConfig,
-                                                                                  String
topic,
-                                                                                  int expectedNumRecords,
+    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final
Properties consumerConfig,
+                                                                                  final String
topic,
+                                                                                  final int
expectedNumRecords,
                                                                                   long waitTime)
throws InterruptedException {
-        List<KeyValue<K, V>> accumData = new ArrayList<>();
-        long startTime = System.currentTimeMillis();
-        while (true) {
-            List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
-            accumData.addAll(readData);
-            if (accumData.size() >= expectedNumRecords)
-                return accumData;
-            if (System.currentTimeMillis() > startTime + waitTime)
-                throw new AssertionError("Expected " +  expectedNumRecords +
+        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+
+        TestCondition valuesRead = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
+                accumData.addAll(readData);
+                return accumData.size() >= expectedNumRecords;
+            }
+        };
+
+        TestUtils.waitForCondition(valuesRead, waitTime);
+
+        if (accumData.size() < expectedNumRecords) {
+            throw new AssertionError("Expected " + expectedNumRecords +
                     " but received only " + accumData.size() +
                     " records before timeout " + waitTime + " ms");
-            Thread.sleep(Math.min(waitTime, 100L));
         }
+
+        return accumData;
     }
 
     public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
@@ -219,23 +228,31 @@ public class IntegrationTestUtils {
      * @throws InterruptedException
      * @throws AssertionError if the given wait time elapses
      */
-    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
-                                                                String topic,
-                                                                int expectedNumRecords,
+    public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties
consumerConfig,
+                                                                final String topic,
+                                                                final int expectedNumRecords,
                                                                 long waitTime) throws InterruptedException
{
-        List<V> accumData = new ArrayList<>();
-        long startTime = System.currentTimeMillis();
-        while (true) {
-            List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
-            accumData.addAll(readData);
-            if (accumData.size() >= expectedNumRecords)
-                return accumData;
-            if (System.currentTimeMillis() > startTime + waitTime)
-                throw new AssertionError("Expected " +  expectedNumRecords +
+        final List<V> accumData = new ArrayList<>();
+
+        TestCondition valuesRead = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
+                accumData.addAll(readData);
+                return accumData.size() >= expectedNumRecords;
+            }
+        };
+
+        TestUtils.waitForCondition(valuesRead, waitTime);
+
+        if (accumData.size() < expectedNumRecords) {
+            throw new AssertionError("Expected " + expectedNumRecords +
                     " but received only " + accumData.size() +
                     " records before timeout " + waitTime + " ms");
-            Thread.sleep(Math.min(waitTime, 100L));
         }
+
+        return accumData;
+
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 62b283a..78dfa7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -38,10 +38,10 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateTestUtils;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,7 +66,7 @@ public class ProcessorTopologyTest {
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling
running tests in parallel ...
-        File localState = StateTestUtils.tempDir();
+        File localState = TestUtils.tempDir();
         Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index be5596d..ab274f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
 import java.util.HashMap;
@@ -210,7 +211,7 @@ public class KeyValueStoreTestDriver<K, V> {
                 send(record, keySerializer, valueSerializer);
             }
         };
-        this.stateDir = StateTestUtils.tempDir();
+        this.stateDir = TestUtils.tempDir();
         this.stateDir.mkdirs();
 
         Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
deleted file mode 100644
index f348fc9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A utility for tests to create and manage unique and isolated directories on the file system
for local state.
- */
-public class StateTestUtils {
-
-    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
-
-    /**
-     * Create a new temporary directory that will be cleaned up automatically upon shutdown.
-     * @return the new directory that will exist; never null
-     */
-    public static File tempDir() {
-        try {
-            final File dir = Files.createTempDirectory(new File("/tmp").toPath(), "test").toFile();
-            dir.mkdirs();
-            dir.deleteOnExit();
-
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    deleteDirectory(dir);
-                }
-            });
-            return dir;
-        } catch (IOException ex) {
-            throw new RuntimeException("Failed to create a temp dir", ex);
-        }
-    }
-
-    private static void deleteDirectory(File dir) {
-        if (dir != null && dir.exists()) {
-            try {
-                Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
-                    @Override
-                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
-                        Files.delete(file);
-                        return FileVisitResult.CONTINUE;
-                    }
-
-                    @Override
-                    public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
-                        Files.delete(dir);
-                        return FileVisitResult.CONTINUE;
-                    }
-
-                });
-            } catch (IOException e) {
-                // do nothing
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/88924b03/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
new file mode 100644
index 0000000..5a7bfa7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.kafka.test;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+import java.util.UUID;
+
+public class StreamsTestUtils {
+
+    public static Properties getStreamsConfig(final String applicationId,
+                                              final String bootstrapServers,
+                                              final String keySerdeClassName,
+                                              final String valueSerdeClassName,
+                                              final Properties additional) {
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/" + applicationId);
+        streamsConfiguration.putAll(additional);
+        return streamsConfiguration;
+
+    }
+
+    /**
+     * Streams configuration with a random generated UUID for the application id
+     */
+    public static Properties getStreamsConfig(String bootstrapServer, String keySerdeClassName,
String valueSerdeClassName) {
+        return getStreamsConfig(UUID.randomUUID().toString(),
+                bootstrapServer,
+                keySerdeClassName,
+                valueSerdeClassName,
+                new Properties());
+    }
+
+}


Mime
View raw message