kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3926: Fix transient test failure in RegexSourceIntegrationTest
Date Wed, 06 Jul 2016 19:32:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7218648ae -> efc4c8881


KAFKA-3926: Fix transient test failure in RegexSourceIntegrationTest

…Tasks call versus simply asserting createStreamTasks method called.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1590 from bbejeck/KAFKA-3926-transient-failures-regex-source-integration-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efc4c888
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efc4c888
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efc4c888

Branch: refs/heads/trunk
Commit: efc4c88811c93d11eeffe61310b7f8652ca73ceb
Parents: 7218648
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Wed Jul 6 12:32:09 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 6 12:32:09 2016 -0700

----------------------------------------------------------------------
 .../integration/RegexSourceIntegrationTest.java | 60 ++++++++++++--------
 1 file changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/efc4c888/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index b0a1e96..5a30af5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -138,21 +138,34 @@ public class RegexSourceIntegrationTest {
         StreamThread[] streamThreads =  (StreamThread[]) streamThreadsField.get(streams);
         StreamThread originalThread = streamThreads[0];
 
-        TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+        final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
                                            new DefaultKafkaClientSupplier(),
                                            originalThread.applicationId, originalThread.clientId,
originalThread.processId, new Metrics(), new SystemTime());
 
-        TestCondition tasksUpdated = createTasksUpdatedCondition(testStreamThread);
+        TestCondition oneTopicAdded  = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
+                return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1")
&& !assignedTopics.contains("TEST-TOPIC-2");
+            }
+        };
 
         streamThreads[0] = testStreamThread;
         streams.start();
 
-        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
-        testStreamThread.streamTaskUpdated = false;
+        TestUtils.waitForCondition(oneTopicAdded,  STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
+        TestCondition secondTopicAdded  = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
+                return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1")
&& assignedTopics.contains("TEST-TOPIC-2");
+            }
+        };
+
+        TestUtils.waitForCondition(secondTopicAdded,  STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
 
@@ -186,22 +199,35 @@ public class RegexSourceIntegrationTest {
         StreamThread[] streamThreads =  (StreamThread[]) streamThreadsField.get(streams);
         StreamThread originalThread = streamThreads[0];
 
-        TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+        final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
                 new DefaultKafkaClientSupplier(),
                 originalThread.applicationId, originalThread.clientId, originalThread.processId,
new Metrics(), new SystemTime());
 
         streamThreads[0] = testStreamThread;
-        TestCondition tasksUpdated = createTasksUpdatedCondition(testStreamThread);
 
+        TestCondition bothTopicsAdded  = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
+                return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-A")
&& assignedTopics.contains("TEST-TOPIC-B");
+            }
+        };
         streams.start();
 
-        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
-        //reset
-        testStreamThread.streamTaskUpdated = false;
+        TestUtils.waitForCondition(bothTopicsAdded,  STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-        TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
+
+        TestCondition oneTopicRemoved  = new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
+                return assignedTopics != null && !assignedTopics.contains("TEST-TOPIC-A")
&& assignedTopics.contains("TEST-TOPIC-B");
+            }
+        };
+
+        TestUtils.waitForCondition(oneTopicRemoved,  STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
 
@@ -310,7 +336,6 @@ public class RegexSourceIntegrationTest {
 
         public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
         private int index =  0;
-        public volatile boolean streamTaskUpdated = false;
 
         public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier
clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time
time) {
             super(builder, config, clientSupplier, applicationId, clientId, processId, metrics,
time);
@@ -323,21 +348,10 @@ public class RegexSourceIntegrationTest {
                 assignedTopics.add(partition.topic());
             }
             Collections.sort(assignedTopics);
-            streamTaskUpdated = true;
             assignedTopicPartitions.put(index++, assignedTopics);
             return super.createStreamTask(id, partitions);
         }
 
     }
 
-
-    private TestCondition createTasksUpdatedCondition(final TestStreamThread testStreamThread)
{
-        return new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return testStreamThread.streamTaskUpdated;
-            }
-        };
-    }
-
 }


Mime
View raw message