kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4859: Set shorter commit interval for integration tests with caching
Date Fri, 17 Mar 2017 05:36:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b8fe2bb56 -> cfda0368d


KAFKA-4859: Set shorter commit interval for integration tests with caching

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Armin Braun, Damian Guy, Jason Gustafson

Closes #2682 from guozhangwang/K4859-cache-commit-interval


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cfda0368
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cfda0368
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cfda0368

Branch: refs/heads/trunk
Commit: cfda0368dbb7ddd014780e1ed1dedcae2778f403
Parents: b8fe2bb
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Mar 16 22:36:06 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 16 22:36:06 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/integration/FanoutIntegrationTest.java      | 4 +++-
 .../integration/KStreamAggregationDedupIntegrationTest.java   | 7 +++++--
 .../streams/integration/KStreamKTableJoinIntegrationTest.java | 2 ++
 .../kafka/streams/integration/KStreamRepartitionJoinTest.java | 2 ++
 .../streams/integration/QueryableStateIntegrationTest.java    | 6 ++++--
 5 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cfda0368/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 8450af5..421efc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -70,6 +70,8 @@ import static org.junit.Assert.assertThat;
 @RunWith(Parameterized.class)
 public class FanoutIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final long COMMIT_INTERVAL_MS = 300L;
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private final MockTime mockTime = CLUSTER.time;
@@ -112,6 +114,7 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
@@ -174,5 +177,4 @@ public class FanoutIntegrationTest {
         streams.close();
         assertThat(actualValuesForC, equalTo(expectedValuesForC));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfda0368/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index dcaa222..7b14cf8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -61,6 +61,8 @@ import static org.hamcrest.core.Is.is;
  */
 public class KStreamAggregationDedupIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final long COMMIT_INTERVAL_MS = 300L;
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
         new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -90,7 +92,7 @@ public class KStreamAggregationDedupIntegrationTest {
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024
* 1024L);
 
         KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer,
String>SelectValueMapper();
@@ -304,7 +306,8 @@ public class KStreamAggregationDedupIntegrationTest {
             valueDeserializer.getClass().getName());
         return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
             outputTopic,
-            numMessages, 60 * 1000);
+            numMessages,
+            60 * 1000);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfda0368/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 036a3ee..e60530d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -63,6 +63,7 @@ import static org.junit.Assert.assertThat;
 @RunWith(Parameterized.class)
 public class KStreamKTableJoinIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final long COMMIT_INTERVAL_MS = 300L;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -90,6 +91,7 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
             TestUtils.tempDirectory().getPath());

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfda0368/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 46084a9..5d44255 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -64,6 +64,7 @@ import static org.hamcrest.core.Is.is;
 public class KStreamRepartitionJoinTest {
 
     private static final int NUM_BROKERS = 1;
+    private static final long COMMIT_INTERVAL_MS = 300L;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -104,6 +105,7 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfda0368/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index fbb7089..a2b56d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -80,6 +80,8 @@ import static org.hamcrest.core.IsEqual.equalTo;
 @RunWith(Parameterized.class)
 public class QueryableStateIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final long COMMIT_INTERVAL_MS = 300L;
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
         new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -141,10 +143,10 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration
-            .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
 


Mime
View raw message