Repository: kafka
Updated Branches:
refs/heads/trunk 16469d7f9 -> 2586226a9
KAFKA-4058: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset
- use AdminTool to check for active consumer group
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes #1767 from mjsax/kafka-4058-trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2586226a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2586226a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2586226a
Branch: refs/heads/trunk
Commit: 2586226a9a5300fea427ca001608ad86d393df1b
Parents: 16469d7
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Sep 6 23:02:41 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 6 23:02:41 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/test/TestUtils.java | 112 +++++++++--------
.../main/scala/kafka/tools/StreamsResetter.java | 2 +-
.../integration/ResetIntegrationTest.java | 123 +++++++++++++------
3 files changed, 147 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 44026be..265661a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,16 @@
*/
package org.apache.kafka.test;
-import static java.util.Arrays.asList;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.Utils;
import java.io.File;
import java.io.IOException;
@@ -32,16 +41,7 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.Utils;
+import static java.util.Arrays.asList;
/**
* Helper functions for writing unit tests
@@ -62,51 +62,51 @@ public class TestUtils {
public static final Random RANDOM = new Random();
public static final long DEFAULT_MAX_WAIT_MS = 15000;
- public static Cluster singletonCluster(Map<String, Integer> topicPartitionCounts)
{
+ public static Cluster singletonCluster(final Map<String, Integer> topicPartitionCounts)
{
return clusterWith(1, topicPartitionCounts);
}
- public static Cluster singletonCluster(String topic, int partitions) {
+ public static Cluster singletonCluster(final String topic, final int partitions) {
return clusterWith(1, topic, partitions);
}
- public static Cluster clusterWith(int nodes, Map<String, Integer> topicPartitionCounts)
{
- Node[] ns = new Node[nodes];
+ public static Cluster clusterWith(final int nodes, final Map<String, Integer> topicPartitionCounts)
{
+ final Node[] ns = new Node[nodes];
for (int i = 0; i < nodes; i++)
ns[i] = new Node(i, "localhost", 1969);
- List<PartitionInfo> parts = new ArrayList<>();
- for (Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet())
{
- String topic = topicPartition.getKey();
- int partitions = topicPartition.getValue();
+ final List<PartitionInfo> parts = new ArrayList<>();
+ for (final Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet())
{
+ final String topic = topicPartition.getKey();
+ final int partitions = topicPartition.getValue();
for (int i = 0; i < partitions; i++)
parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
}
return new Cluster(asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS);
}
- public static Cluster clusterWith(int nodes, String topic, int partitions) {
+ public static Cluster clusterWith(final int nodes, final String topic, final int partitions)
{
return clusterWith(nodes, Collections.singletonMap(topic, partitions));
}
/**
* Generate an array of random bytes
- *
+ *
* @param size The size of the array
*/
- public static byte[] randomBytes(int size) {
- byte[] bytes = new byte[size];
+ public static byte[] randomBytes(final int size) {
+ final byte[] bytes = new byte[size];
SEEDED_RANDOM.nextBytes(bytes);
return bytes;
}
/**
* Generate a random string of letters and digits of the given length
- *
+ *
* @param len The length of the string
* @return The random string
*/
- public static String randomString(int len) {
- StringBuilder b = new StringBuilder();
+ public static String randomString(final int len) {
+ final StringBuilder b = new StringBuilder();
for (int i = 0; i < len; i++)
b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length())));
return b.toString();
@@ -117,7 +117,7 @@ public class TestUtils {
* suffix to generate its name.
*/
public static File tempFile() throws IOException {
- File file = File.createTempFile("kafka", ".tmp");
+ final File file = File.createTempFile("kafka", ".tmp");
file.deleteOnExit();
return file;
@@ -128,14 +128,15 @@ public class TestUtils {
*
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default
prefix
*/
- public static File tempDirectory(String prefix) {
+ public static File tempDirectory(final String prefix) {
return tempDirectory(null, prefix);
}
/**
* Create a temporary relative directory in the default temporary-file directory with
a
* prefix of "kafka-"
- * @return the temporary directory just created.
+ *
+ * @return the temporary directory just created.
*/
public static File tempDirectory() {
return tempDirectory(null);
@@ -147,13 +148,13 @@ public class TestUtils {
* @param parent The parent folder path name, if null using the default temporary-file
directory
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default
prefix
*/
- public static File tempDirectory(Path parent, String prefix) {
+ public static File tempDirectory(final Path parent, String prefix) {
final File file;
prefix = prefix == null ? "kafka-" : prefix;
try {
file = parent == null ?
- Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent,
prefix).toFile();
- } catch (IOException ex) {
+ Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent,
prefix).toFile();
+ } catch (final IOException ex) {
throw new RuntimeException("Failed to create a temp dir", ex);
}
file.deleteOnExit();
@@ -174,13 +175,13 @@ public class TestUtils {
* `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset,
int valueSize)` as this
* constructor does not include either of these fields.
*/
- public static ByteBuffer partitionRecordsBuffer(long offset, CompressionType compressionType,
Record... records) {
+ public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType
compressionType, final Record... records) {
int bufferSize = 0;
- for (Record record : records)
+ for (final Record record : records)
bufferSize += Records.LOG_OVERHEAD + record.size();
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
- for (Record record : records)
+ final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ final MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
+ for (final Record record : records)
memoryRecords.append(offset, record);
memoryRecords.close();
return memoryRecords.buffer();
@@ -200,7 +201,7 @@ public class TestUtils {
return properties;
}
- public static Properties producerConfig(final String bootstrapServers, Class keySerializer,
Class valueSerializer) {
+ public static Properties producerConfig(final String bootstrapServers, final Class keySerializer,
final Class valueSerializer) {
return producerConfig(bootstrapServers, keySerializer, valueSerializer, new Properties());
}
@@ -220,21 +221,32 @@ public class TestUtils {
return consumerConfig;
}
+ public static Properties consumerConfig(final String bootstrapServers,
+ final String groupId,
+ final Class keyDeserializer,
+ final Class valueDeserializer) {
+ return consumerConfig(bootstrapServers,
+ groupId,
+ keyDeserializer,
+ valueDeserializer,
+ new Properties());
+ }
+
/**
* returns consumer config with random UUID for the Group ID
*/
- public static Properties consumerConfig(final String bootstrapServers, Class keyDeserializer,
Class valueDeserializer) {
+ public static Properties consumerConfig(final String bootstrapServers, final Class keyDeserializer,
final Class valueDeserializer) {
return consumerConfig(bootstrapServers,
- UUID.randomUUID().toString(),
- keyDeserializer,
- valueDeserializer,
- new Properties());
+ UUID.randomUUID().toString(),
+ keyDeserializer,
+ valueDeserializer,
+ new Properties());
}
/**
- * uses default value of 15 seconds for timeout
+ * uses default value of 15 seconds for timeout
*/
- public static void waitForCondition(TestCondition testCondition, String conditionDetails)
throws InterruptedException {
+ public static void waitForCondition(final TestCondition testCondition, final String conditionDetails)
throws InterruptedException {
waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, conditionDetails);
}
@@ -244,8 +256,8 @@ public class TestUtils {
* without unnecessarily increasing test time (as the condition is checked frequently).
The longer timeout is needed to
* avoid transient failures due to slow or overloaded machines.
*/
- public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String
conditionDetails) throws InterruptedException {
- long startTime = System.currentTimeMillis();
+ public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs,
String conditionDetails) throws InterruptedException {
+ final long startTime = System.currentTimeMillis();
while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime)
< maxWaitMs)) {
Thread.sleep(Math.min(maxWaitMs, 100L));
http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 22b8bd6..7153790 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -90,7 +90,7 @@ public class StreamsResetter {
adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
final String groupId = this.options.valueOf(applicationIdOption);
- if (adminClient.describeConsumerGroup(groupId).get().size() != 0) {
+ if (!adminClient.describeGroup(groupId).members().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still
active. " +
"Make sure to stop all running application instances before running the
reset tool.");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 79ec117..0f1717c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,10 +16,11 @@
*/
package org.apache.kafka.streams.integration;
+import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
+import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -37,11 +38,13 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.Collections;
@@ -61,6 +64,7 @@ public class ResetIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ private final MockTime mockTime = CLUSTER.time;
private static final String APP_ID = "cleanup-integration-test";
private static final String INPUT_TOPIC = "inputTopic";
@@ -72,6 +76,10 @@ public class ResetIntegrationTest {
private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+ private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
+
+ private AdminClient adminClient = null;
+
@BeforeClass
public static void startKafkaCluster() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC);
@@ -81,11 +89,27 @@ public class ResetIntegrationTest {
CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
}
- @Ignore
+ @Before
+ public void prepare() {
+ adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
+ }
+
+ @After
+ public void cleanup() {
+ if (adminClient != null) {
+ adminClient.close();
+ adminClient = null;
+ }
+ }
+
@Test
public void testReprocessingFromScratchAfterReset() throws Exception {
final Properties streamsConfiguration = prepareTest();
- final Properties resultTopicConsumerConfig = prepareResultConsumer();
+ final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+ LongDeserializer.class,
+ LongDeserializer.class);
prepareInputData();
final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);
@@ -93,25 +117,42 @@ public class ResetIntegrationTest {
// RUN
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ resultTopicConsumerConfig,
+ OUTPUT_TOPIC,
+ 10);
// receive only first values to make sure intermediate user topic is not consumed
completely
// => required to test "seekToEnd" for intermediate topics
- final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC_2, 1).get(0);
+ final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ resultTopicConsumerConfig,
+ OUTPUT_TOPIC_2,
+ 1
+ ).get(0);
streams.close();
+ TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT)
+ " ms.");
// RESET
- Utils.sleep(STREAMS_CONSUMER_TIMEOUT);
streams.cleanUp();
cleanGlobal();
+ TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT)
+ " ms.");
+
assertInternalTopicsGotDeleted();
- Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);
// RE-RUN
streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC, 10);
- final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
OUTPUT_TOPIC_2_RERUN, 1).get(0);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ resultTopicConsumerConfig,
+ OUTPUT_TOPIC,
+ 10);
+ final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ resultTopicConsumerConfig,
+ OUTPUT_TOPIC_2_RERUN,
+ 1
+ ).get(0);
streams.close();
assertThat(resultRerun, equalTo(result));
@@ -137,35 +178,29 @@ public class ResetIntegrationTest {
return streamsConfiguration;
}
- private Properties prepareResultConsumer() {
- final Properties resultTopicConsumerConfig = new Properties();
- resultTopicConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- resultTopicConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-standard-consumer-"
+ OUTPUT_TOPIC);
- resultTopicConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- resultTopicConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- resultTopicConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
-
- return resultTopicConsumerConfig;
- }
-
private void prepareInputData() throws Exception {
- final Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "aaa")), producerConfig, 10L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "bbb")), producerConfig, 20L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "ccc")), producerConfig, 30L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "ddd")), producerConfig, 40L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "eee")), producerConfig, 50L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "fff")), producerConfig, 60L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "ggg")), producerConfig, 61L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "hhh")), producerConfig, 62L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "iii")), producerConfig, 63L);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "jjj")), producerConfig, 64L);
+ final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
LongSerializer.class, StringSerializer.class);
+
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(1);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(1);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(1);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
+ mockTime.sleep(1);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new
KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
private KStreamBuilder setupTopology(final String outputTopic2) {
@@ -188,12 +223,15 @@ public class ResetIntegrationTest {
final KStream<Long, Long> windowedCounts = input
.through(INTERMEDIATE_USER_TOPIC)
.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+ private long sleep = 1000;
+
@Override
public KeyValue<Long, String> apply(final Long key, final String value)
{
// must sleep long enough to avoid processing the whole intermediate
topic before application gets stopped
// => want to test "skip over" unprocessed records
// increasing the sleep time only has disadvantage that test run time
is increased
- Utils.sleep(1000);
+ mockTime.sleep(sleep);
+ sleep *= 2;
return new KeyValue<>(key, value);
}
})
@@ -258,4 +296,11 @@ public class ResetIntegrationTest {
assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
}
+ private class WaitUntilConsumerGroupGotClosed implements TestCondition {
+ @Override
+ public boolean conditionMet() {
+ return adminClient.describeGroup(APP_ID).members().isEmpty();
+ }
+ }
+
}
|