kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5140: Fix reset integration test
Date Mon, 23 Oct 2017 19:41:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 4701d4cc7 -> 33640106b


KAFKA-5140: Fix reset integration test

A couple of root causes of this flaky test is fixed:

1. The MockTime was incorrectly used across multiple test methods within the class, as a class
rule. Instead we set it on each test case; also remove the scala MockTime dependency.

2. List topics may not contain the deleted topics while their ZK paths are yet to be deleted;
so the delete-check-recreate pattern may fail to successfully recreate the topic at all. Change
the checking to read from zk path directly instead.

Another minor fix is to remove the misleading wait condition error message as the accumData
is always empty.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>,
Matthias J. Sax <matthias@confluent.io>

Closes #4095 from guozhangwang/KMinor-reset-integration-test

(cherry picked from commit d3f24798f92a6e4d6038d28b63daf3ac9d14fcbf)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33640106
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33640106
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33640106

Branch: refs/heads/0.11.0
Commit: 33640106b430c1b19954dc076a500530ecd29992
Parents: 4701d4c
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Oct 23 12:35:31 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Oct 23 12:41:35 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/utils/MockTime.java |  9 ++++++
 .../internals/InternalTopicManager.java         | 17 +++++++----
 .../integration/ResetIntegrationTest.java       |  8 ++----
 .../integration/utils/EmbeddedKafkaCluster.java | 30 ++++++++++++++++----
 .../integration/utils/IntegrationTestUtils.java |  9 ++----
 5 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/33640106/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index af67937..be04aed 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -73,4 +73,13 @@ public class MockTime implements Time {
         highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
     }
 
+    public void setCurrentTimeMs(long newMs) {
+        long oldMs = timeMs.getAndSet(newMs);
+
+        // does not allow to set to an older timestamp
+        if (oldMs > newMs)
+            throw new IllegalArgumentException("Setting the time to " + newMs + " while current
time " + oldMs + " is newer; this is not allowed");
+
+        highResTimeNs.set(TimeUnit.MILLISECONDS.toNanos(newMs));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33640106/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 8a0c7c0..9eabf2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -31,23 +31,30 @@ import java.util.concurrent.TimeUnit;
 
 public class InternalTopicManager {
 
-    private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
+    static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1,
TimeUnit.DAYS);
+
     public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
     public static final String RETENTION_MS = "retention.ms";
-    public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1,
TimeUnit.DAYS);
     private static final int MAX_TOPIC_READY_TRY = 5;
+
+    private final Logger log;
     private final Time time;
     private final long windowChangeLogAdditionalRetention;
 
     private final int replicationFactor;
     private final StreamsKafkaClient streamsKafkaClient;
 
-    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
-                                final long windowChangeLogAdditionalRetention, final Time
time) {
+    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient,
+                                final int replicationFactor,
+                                final long windowChangeLogAdditionalRetention,
+                                final Time time) {
         this.streamsKafkaClient = streamsKafkaClient;
         this.replicationFactor = replicationFactor;
         this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
         this.time = time;
+
+        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
+        this.log = logContext.logger(getClass());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/33640106/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 7868981..c8ba2bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -73,10 +73,7 @@ public class ResetIntegrationTest {
         // expiration of connections by the brokers to avoid errors when `AdminClient` sends
requests after potentially
         // very long sleep times
         props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
-        // we align time to seconds to get clean window boundaries and thus ensure the same
result for each run
-        // otherwise, input records could fall into different windows for different runs
depending on the initial mock time
-        final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
-        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
+        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
     }
 
     private static final String APP_ID = "cleanup-integration-test";
@@ -93,7 +90,7 @@ public class ResetIntegrationTest {
     private static int testNo = 0;
     private static AdminClient adminClient = null;
 
-    private final MockTime mockTime = CLUSTER.time;
+    private MockTime mockTime;
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
 
     @AfterClass
@@ -107,6 +104,7 @@ public class ResetIntegrationTest {
     @Before
     public void cleanup() throws Exception {
         ++testNo;
+        mockTime = CLUSTER.time;
 
         if (adminClient == null) {
             adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/33640106/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index e738bc6..911738d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -76,7 +76,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         brokers = new KafkaEmbedded[numBrokers];
         this.brokerConfig = brokerConfig;
         time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
-
     }
 
     /**
@@ -122,7 +121,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     /**
      * Stop the Kafka cluster.
      */
-    public void stop() {
+    private void stop() {
         for (final KafkaEmbedded broker : brokers) {
             broker.stop();
         }
@@ -284,22 +283,41 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         createTopics(topics);
     }
 
+    public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws
InterruptedException {
+        TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics
are not expected after " + timeoutMs + " milli seconds.");
+    }
+
     private final class TopicsDeletedCondition implements TestCondition {
-        final Set<String> deletedTopic = new HashSet<>();
+        final Set<String> deletedTopics = new HashSet<>();
 
         private TopicsDeletedCondition(final String... topics) {
-            Collections.addAll(deletedTopic, topics);
+            Collections.addAll(deletedTopics, topics);
+        }
+
+        @Override
+        public boolean conditionMet() {
+            final Set<String> allTopics = new HashSet<>();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            return !allTopics.removeAll(deletedTopics);
+        }
+    }
+
+    private final class TopicsRemainingCondition implements TestCondition {
+        final Set<String> remainingTopics = new HashSet<>();
+
+        private TopicsRemainingCondition(final String... topics) {
+            Collections.addAll(remainingTopics, topics);
         }
 
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>();
             allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            return !allTopics.removeAll(deletedTopic);
+            return allTopics.equals(remainingTopics);
         }
     }
 
-    public List<KafkaServer> brokers() {
+    private List<KafkaServer> brokers() {
         final List<KafkaServer> servers = new ArrayList<>();
         for (final KafkaEmbedded broker : brokers) {
             servers.add(broker.kafkaServer());

http://git-wip-us.apache.org/repos/asf/kafka/blob/33640106/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 7cdc180..6f387c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -154,9 +154,7 @@ public class IntegrationTestUtils {
                     return accumData.size() >= expectedNumRecords;
                 }
             };
-            final String conditionDetails =
-                "Expecting " + expectedNumRecords + " records from topic " + topic +
-                    " while only received " + accumData.size() + ": " + accumData;
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
         }
         return accumData;
@@ -195,9 +193,7 @@ public class IntegrationTestUtils {
                     return accumData.size() >= expectedNumRecords;
                 }
             };
-            final String conditionDetails =
-                "Expecting " + expectedNumRecords + " records from topic " + topic +
-                    " while only received " + accumData.size() + ": " + accumData;
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
         }
         return accumData;
@@ -317,6 +313,7 @@ public class IntegrationTestUtils {
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
             final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
             }


Mime
View raw message