kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5140: Fix reset integration test
Date Mon, 23 Oct 2017 19:35:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a3c4ab242 -> d3f24798f


KAFKA-5140: Fix reset integration test

A couple of root causes of this flaky test is fixed:

1. The MockTime was incorrectly used across multiple test methods within the class, as a class
rule. Instead we set it on each test case; also remove the scala MockTime dependency.

2. List topics may not contain the deleted topics while their ZK paths are yet to be deleted;
so the delete-check-recreate pattern may fail to successfully recreate the topic at all. Change
the checking to read from zk path directly instead.

Another minor fix is to remove the misleading wait condition error message as the accumData
is always empty.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>,
Matthias J. Sax <matthias@confluent.io>

Closes #4095 from guozhangwang/KMinor-reset-integration-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3f24798
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3f24798
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3f24798

Branch: refs/heads/trunk
Commit: d3f24798f92a6e4d6038d28b63daf3ac9d14fcbf
Parents: a3c4ab2
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Oct 23 12:35:31 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Oct 23 12:35:31 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/utils/MockTime.java |  9 +++
 .../internals/InternalTopicManager.java         | 17 ++++--
 .../internals/StreamPartitionAssignor.java      | 18 +++---
 .../AbstractResetIntegrationTest.java           | 64 ++++----------------
 .../integration/ResetIntegrationTest.java       |  5 +-
 .../ResetIntegrationWithSslTest.java            |  5 +-
 .../integration/utils/EmbeddedKafkaCluster.java | 30 +++++++--
 .../integration/utils/IntegrationTestUtils.java |  9 +--
 8 files changed, 70 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index af67937..be04aed 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -73,4 +73,13 @@ public class MockTime implements Time {
         highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
     }
 
+    public void setCurrentTimeMs(long newMs) {
+        long oldMs = timeMs.getAndSet(newMs);
+
+        // does not allow to set to an older timestamp
+        if (oldMs > newMs)
+            throw new IllegalArgumentException("Setting the time to " + newMs + " while current
time " + oldMs + " is newer; this is not allowed");
+
+        highResTimeNs.set(TimeUnit.MILLISECONDS.toNanos(newMs));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index cd57867..a038d09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -31,23 +31,30 @@ import java.util.concurrent.TimeUnit;
 
 public class InternalTopicManager {
 
-    private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
+    static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1,
TimeUnit.DAYS);
+
     public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
     public static final String RETENTION_MS = "retention.ms";
-    public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1,
TimeUnit.DAYS);
     private static final int MAX_TOPIC_READY_TRY = 5;
+
+    private final Logger log;
     private final Time time;
     private final long windowChangeLogAdditionalRetention;
 
     private final int replicationFactor;
     private final StreamsKafkaClient streamsKafkaClient;
 
-    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
-                                final long windowChangeLogAdditionalRetention, final Time
time) {
+    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient,
+                                final int replicationFactor,
+                                final long windowChangeLogAdditionalRetention,
+                                final Time time) {
         this.streamsKafkaClient = streamsKafkaClient;
         this.replicationFactor = replicationFactor;
         this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
         this.time = time;
+
+        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
+        this.log = logContext.logger(getClass());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 621eb15..9e505a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
@@ -200,9 +201,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
-        //Initializing the logger without threadDataProvider name because provider name is
not known/verified at this point
-        logPrefix = String.format("stream-thread ");
-        LogContext logContext = new LogContext(logPrefix);
+        // Setting the logger with the passed in client thread name
+        logPrefix = String.format("stream-thread [%s] ", configs.get(CommonClientConfigs.CLIENT_ID_CONFIG));
+        final LogContext logContext = new LogContext(logPrefix);
         this.log = logContext.logger(getClass());
 
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
@@ -221,11 +222,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
         threadDataProvider = (ThreadDataProvider) o;
         threadDataProvider.setThreadMetadataProvider(this);
 
-        //Reassigning the logger with threadDataProvider name
-        logPrefix = String.format("stream-thread [%s] ", threadDataProvider.name());
-        logContext = new LogContext(logPrefix);
-        this.log = logContext.logger(getClass());
-
         String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
             try {
@@ -502,8 +498,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
             states.put(entry.getKey(), entry.getValue().state);
         }
 
-        log.debug("{} Assigning tasks {} to clients {} with number of replicas {}",
-                logPrefix, partitionsForTask.keySet(), states, numStandbyReplicas);
+        log.debug("Assigning tasks {} to clients {} with number of replicas {}",
+                partitionsForTask.keySet(), states, numStandbyReplicas);
 
         final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states,
partitionsForTask.keySet());
         taskAssignor.assign(numStandbyReplicas);
@@ -688,7 +684,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
             }
         }
 
-        log.debug("Completed validating internal topics in partition assignor");
+        log.debug("Completed validating internal topics in partition assignor.");
     }
 
     private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final
Map<InternalTopicConfig, Integer> topicsToMakeReady) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 6ab7141..3252f6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -18,10 +18,8 @@ package org.apache.kafka.streams.integration;
 
 import kafka.admin.AdminClient;
 import kafka.tools.StreamsResetter;
-import kafka.utils.MockTime;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -29,6 +27,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -52,10 +51,8 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -108,8 +105,13 @@ abstract class AbstractResetIntegrationTest {
 
     void beforePrepareTest() throws Exception {
         ++testNo;
-        bootstrapServers = cluster.bootstrapServers();
         mockTime = cluster.time;
+        bootstrapServers = cluster.bootstrapServers();
+
+        // we align time to seconds to get clean window boundaries and thus ensure the same
result for each run
+        // otherwise, input records could fall into different windows for different runs
depending on the initial mock time
+        final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
+        mockTime.setCurrentTimeMs(alignedTime);
 
         Properties sslConfig = getClientSslConfig();
         if (sslConfig == null) {
@@ -161,14 +163,6 @@ abstract class AbstractResetIntegrationTest {
 
         // RUN
         KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(),
streamsConfiguration);
-        final KafkaStreams handlerReference = streams;
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                handlerReference.close(10, TimeUnit.SECONDS);
-                log.error("Streams application failed: ", e);
-            }
-        });
         streams.start();
         final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
@@ -181,14 +175,6 @@ abstract class AbstractResetIntegrationTest {
 
         // RESET
         streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
-        final KafkaStreams handlerReference2 = streams;
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                handlerReference2.close(10, TimeUnit.SECONDS);
-                log.error("Streams application failed: ", e);
-            }
-        });
         streams.cleanUp();
         cleanGlobal(null, sslConfig);
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
@@ -229,14 +215,6 @@ abstract class AbstractResetIntegrationTest {
 
         // RUN
         KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2),
streamsConfiguration);
-        final KafkaStreams handlerReference = streams;
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                handlerReference.close(10, TimeUnit.SECONDS);
-                log.error("Streams application failed: ", e);
-            }
-        });
         streams.start();
         final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
@@ -269,14 +247,6 @@ abstract class AbstractResetIntegrationTest {
 
         // RESET
         streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN),
streamsConfiguration);
-        final KafkaStreams handlerReference2 = streams;
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                handlerReference2.close(10, TimeUnit.SECONDS);
-                log.error("Streams application failed: ", e);
-            }
-        });
         streams.cleanUp();
         cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
@@ -447,27 +417,19 @@ abstract class AbstractResetIntegrationTest {
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
 
+        log.info("Calling StreamsResetter with parameters {} and configs {}", parameters,
cleanUpConfig);
+
         final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
         Assert.assertEquals(0, exitCode);
     }
 
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws
Exception {
-        final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
-        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
+        // do not use list topics request, but read from the embedded cluster's zookeeper
path directly to confirm
         if (intermediateUserTopic != null) {
-            expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
+            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2,
OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
+        } else {
+            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2,
OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
         }
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
-        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
-
-        final Set<String> allTopics = new HashSet<>();
-
-        final ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
-        listTopicsOptions.listInternal(true);
-        allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000,
TimeUnit.MILLISECONDS));
-        assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 549f8f1..a72bad4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -41,10 +41,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest
{
         // expiration of connections by the brokers to avoid errors when `AdminClient` sends
requests after potentially
         // very long sleep times
         props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
-        // we align time to seconds to get clean window boundaries and thus ensure the same
result for each run
-        // otherwise, input records could fall into different windows for different runs
depending on the initial mock time
-        final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
-        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
+        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
         cluster = CLUSTER;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index f0dd0c8..e58b18c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -60,10 +60,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest
{
         props.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
         props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
         props.putAll(sslConfig);
-        // we align time to seconds to get clean window boundaries and thus ensure the same
result for each run
-        // otherwise, input records could fall into different windows for different runs
depending on the initial mock time
-        final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
-        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
+        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
         cluster = CLUSTER;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 6a873d2..367e489 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -76,7 +76,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         brokers = new KafkaEmbedded[numBrokers];
         this.brokerConfig = brokerConfig;
         time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
-
     }
 
     /**
@@ -122,7 +121,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     /**
      * Stop the Kafka cluster.
      */
-    public void stop() {
+    private void stop() {
         for (final KafkaEmbedded broker : brokers) {
             broker.stop();
         }
@@ -284,22 +283,41 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         createTopics(topics);
     }
 
+    public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws
InterruptedException {
+        TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics
are not expected after " + timeoutMs + " milli seconds.");
+    }
+
     private final class TopicsDeletedCondition implements TestCondition {
-        final Set<String> deletedTopic = new HashSet<>();
+        final Set<String> deletedTopics = new HashSet<>();
 
         private TopicsDeletedCondition(final String... topics) {
-            Collections.addAll(deletedTopic, topics);
+            Collections.addAll(deletedTopics, topics);
+        }
+
+        @Override
+        public boolean conditionMet() {
+            final Set<String> allTopics = new HashSet<>();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            return !allTopics.removeAll(deletedTopics);
+        }
+    }
+
+    private final class TopicsRemainingCondition implements TestCondition {
+        final Set<String> remainingTopics = new HashSet<>();
+
+        private TopicsRemainingCondition(final String... topics) {
+            Collections.addAll(remainingTopics, topics);
         }
 
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>();
             allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            return !allTopics.removeAll(deletedTopic);
+            return allTopics.equals(remainingTopics);
         }
     }
 
-    public List<KafkaServer> brokers() {
+    private List<KafkaServer> brokers() {
         final List<KafkaServer> servers = new ArrayList<>();
         for (final KafkaEmbedded broker : brokers) {
             servers.add(broker.kafkaServer());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3f24798/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 4dfd075..c0eeab3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -154,9 +154,7 @@ public class IntegrationTestUtils {
                     return accumData.size() >= expectedNumRecords;
                 }
             };
-            final String conditionDetails =
-                "Expecting " + expectedNumRecords + " records from topic " + topic +
-                    " while only received " + accumData.size() + ": " + accumData;
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
         }
         return accumData;
@@ -195,9 +193,7 @@ public class IntegrationTestUtils {
                     return accumData.size() >= expectedNumRecords;
                 }
             };
-            final String conditionDetails =
-                "Expecting " + expectedNumRecords + " records from topic " + topic +
-                    " while only received " + accumData.size() + ": " + accumData;
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
         }
         return accumData;
@@ -317,6 +313,7 @@ public class IntegrationTestUtils {
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
             final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
             }


Mime
View raw message