kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: HOTFIX: fix broken streams test
Date Thu, 06 Jul 2017 13:29:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6d203361a -> 0046a881d


HOTFIX: fix broken streams test

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3493 from dguy/hotfix-test-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0046a881
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0046a881
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0046a881

Branch: refs/heads/trunk
Commit: 0046a881d389878bbb673c2591c06fd667923964
Parents: 6d20336
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Jul 6 14:29:12 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Jul 6 14:29:12 2017 +0100

----------------------------------------------------------------------
 .../processor/internals/StreamThreadTest.java   | 34 ++++++++++++++++----
 1 file changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0046a881/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2056954..a7f1db1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1817,7 +1817,6 @@ public class StreamThreadTest {
         final TaskId taskId = new TaskId(0, 0);
 
         final StreamThread thread = setupTest(taskId);
-
         final StateDirectory testStateDir = new StateDirectory(
             applicationId,
             config.getString(StreamsConfig.STATE_DIR_CONFIG),
@@ -1830,6 +1829,7 @@ public class StreamThreadTest {
         } catch (final Exception e) {
             assertTrue(testStateDir.lock(taskId, 0));
         } finally {
+            thread.close();
             testStateDir.unlock(taskId);
         }
     }
@@ -1839,7 +1839,6 @@ public class StreamThreadTest {
         final TaskId taskId = new TaskId(0, 0);
 
         final StreamThread thread = setupTest(taskId);
-        thread.start();
 
         final StateDirectory testStateDir = new StateDirectory(
             applicationId,
@@ -1856,7 +1855,7 @@ public class StreamThreadTest {
         }
     }
 
-    private StreamThread setupTest(final TaskId taskId) {
+    private StreamThread setupTest(final TaskId taskId) throws InterruptedException {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId(applicationId);
         builder.addSource("source", "topic");
@@ -1896,6 +1895,15 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(testStreamTask.id, testStreamTask.partitions);
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+        thread.start();
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.RUNNING;
+            }
+        }, "thread didn't transition to running");
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
 
         return thread;
@@ -1906,7 +1914,7 @@ public class StreamThreadTest {
         final TaskId taskId = new TaskId(0, 0);
 
         final StreamThread thread = setupStandbyTest(taskId);
-
+        startThreadAndRebalance(thread);
         final StateDirectory testStateDir = new StateDirectory(applicationId,
             config.getString(StreamsConfig.STATE_DIR_CONFIG),
             mockTime);
@@ -1918,17 +1926,29 @@ public class StreamThreadTest {
         } catch (final Exception e) {
             assertTrue(testStateDir.lock(taskId, 0));
         } finally {
+            thread.close();
             testStateDir.unlock(taskId);
         }
     }
 
+    private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException
{
+        thread.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.RUNNING;
+            }
+        }, "thread didn't transition to running");
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
+    }
+
     @Test
     public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask()
throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
         final StreamThread thread = setupStandbyTest(taskId);
-        thread.start();
-
+        startThreadAndRebalance(thread);
         final StateDirectory testStateDir = new StateDirectory(applicationId,
             config.getString(StreamsConfig.STATE_DIR_CONFIG),
             mockTime);
@@ -1939,6 +1959,7 @@ public class StreamThreadTest {
             thread.join();
             assertTrue(testStateDir.lock(taskId, 0));
         } finally {
+            thread.close();
             testStateDir.unlock(taskId);
         }
     }
@@ -1998,7 +2019,6 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
         standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks));
-        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
 
         return thread;
     }


Mime
View raw message