kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3678: Removed sleep from streams integration tests
Date Sat, 28 May 2016 01:39:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 756ec494d -> 7b7c4a7bb


KAFKA-3678: Removed sleep from streams integration tests

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1439 from enothereska/KAFKA-3678-timeouts1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b7c4a7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b7c4a7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b7c4a7b

Branch: refs/heads/trunk
Commit: 7b7c4a7bb0fd25ddca4e4bdde9e605b3d5a1ba70
Parents: 756ec49
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Fri May 27 18:40:11 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 27 18:40:11 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/integration/FanoutIntegrationTest.java      | 5 +----
 .../streams/integration/InternalTopicIntegrationTest.java     | 7 ++-----
 .../apache/kafka/streams/integration/JoinIntegrationTest.java | 5 +----
 .../kafka/streams/integration/MapFunctionIntegrationTest.java | 5 +----
 .../kafka/streams/integration/PassThroughIntegrationTest.java | 5 +----
 .../kafka/streams/integration/WordCountIntegrationTest.java   | 7 ++-----
 6 files changed, 8 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 5199caa..6494533 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -97,6 +97,7 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
         KStream<byte[], String> stream2 = stream1.mapValues(
@@ -119,10 +120,6 @@ public class FanoutIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        // Wait briefly for the topology to be fully up and running (otherwise it might miss
some or all
-        // of the input data we produce below).
-        Thread.sleep(5000);
-
         //
         // Step 2: Produce some input data to the input topic.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index e431b57..809a238 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.integration;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -123,7 +124,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
-
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         KStreamBuilder builder = new KStreamBuilder();
 
         KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
@@ -149,10 +150,6 @@ public class InternalTopicIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        // Wait briefly for the topology to be fully up and running (otherwise it might miss
some or all
-        // of the input data we produce below).
-        Thread.sleep(5000);
-
         //
         // Step 2: Produce some input data to the input topic.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 4f318ec..9e9d366 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -141,6 +141,7 @@ public class JoinIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         // Explicitly place the state directory under /tmp so that we can remove it via
         // `purgeLocalStreamsState` below.  Once Streams is updated to expose the effective
         // StreamsConfig configuration (so we can retrieve whatever state directory Streams
came up
@@ -217,10 +218,6 @@ public class JoinIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        // Wait briefly for the topology to be fully up and running (otherwise it might miss
some or all
-        // of the input data we produce below).
-        Thread.sleep(10000);
-
         //
         // Step 2: Publish user-region information.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
index 3c37aa1..2096d9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
@@ -79,6 +79,7 @@ public class MapFunctionIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC);
         KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String,
String>() {
@@ -92,10 +93,6 @@ public class MapFunctionIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        // Wait briefly for the topology to be fully up and running (otherwise it might miss
some or all
-        // of the input data we produce below).
-        Thread.sleep(5000);
-
         //
         // Step 2: Produce some input data to the input topic.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
index e81d21c..4e6dcb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
@@ -72,6 +72,7 @@ public class PassThroughIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         // Write the input data as-is to the output topic.
         builder.stream(DEFAULT_INPUT_TOPIC).to(DEFAULT_OUTPUT_TOPIC);
@@ -79,10 +80,6 @@ public class PassThroughIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        // Wait briefly for the topology to be fully up and running (otherwise it might miss
some or all
-        // of the input data we produce below).
-        Thread.sleep(5000);
-
         //
         // Step 2: Produce some input data to the input topic.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index c86409a..e00cd13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -83,6 +83,7 @@ public class WordCountIntegrationTest {
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         // Explicitly place the state directory under /tmp so that we can remove it via
         // `purgeLocalStreamsState` below.  Once Streams is updated to expose the effective
         // StreamsConfig configuration (so we can retrieve whatever state directory Streams
came up
@@ -115,11 +116,7 @@ public class WordCountIntegrationTest {
 
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
-
-        // Wait briefly for the topology to be fully up and running (otherwise it might miss
some or all
-        // of the input data we produce below).
-        Thread.sleep(5000);
-
+        
         //
         // Step 2: Produce some input data to the input topic.
         //


Mime
View raw message