kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5818; KafkaStreams state transitions not correct
Date Sat, 02 Sep 2017 07:24:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bc999989b -> 14a7c297a


KAFKA-5818; KafkaStreams state transitions not correct

- need to check that state is CRATED at startup
- some minor test cleanup

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #3775 from mjsax/kafka-5818-kafkaStreams-state-transition


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14a7c297
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14a7c297
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14a7c297

Branch: refs/heads/trunk
Commit: 14a7c297ab917efb28e579fa942d58cdcf4930c5
Parents: bc99998
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Sat Sep 2 08:24:43 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Sat Sep 2 08:24:43 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 20 +++++++++++++++++++-
 .../apache/kafka/streams/KafkaStreamsTest.java  | 19 ++++++++++++-------
 2 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/14a7c297/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 1cb190e..ed8525c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -272,6 +272,24 @@ public class KafkaStreams {
         return true;
     }
 
+    private boolean setRunningFromCreated() {
+        synchronized (stateLock) {
+            if (state != State.CREATED) {
+                log.error("{} Unexpected state transition from {} to {}", logPrefix, state,
State.RUNNING);
+                throw new IllegalStateException(logPrefix + " Unexpected state transition
from " + state + " to " + State.RUNNING);
+            }
+            state = State.RUNNING;
+            stateLock.notifyAll();
+        }
+
+        // we need to call the user customized state listener outside the state lock to avoid
potential deadlocks
+        if (stateListener != null) {
+            stateListener.onChange(State.RUNNING, State.CREATED);
+        }
+
+        return true;
+    }
+
     /**
      * Return the current {@link State} of this {@code KafkaStreams} instance.
      *
@@ -675,7 +693,7 @@ public class KafkaStreams {
 
         // first set state to RUNNING before kicking off the threads,
         // making sure the state will always transit to RUNNING before REBALANCING
-        if (setState(State.RUNNING)) {
+        if (setRunningFromCreated()) {
             checkBrokerVersionCompatibility();
 
             if (globalStreamThread != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/14a7c297/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index fb51d8b..9e3ab2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -234,7 +234,8 @@ public class KafkaStreamsTest {
         streams.close();
         try {
             streams.start();
-        } catch (final IllegalStateException e) {
+            fail("Should have throw IllegalStateException");
+        } catch (final IllegalStateException expected) {
             // this is ok
         } finally {
             streams.close();
@@ -301,14 +302,18 @@ public class KafkaStreamsTest {
         assertEquals(metrics.size(), 16);
     }
 
-    @Test(expected = ConfigException.class)
+    @Test
     public void testIllegalMetricsConfig() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
         final StreamsBuilder builder = new StreamsBuilder();
-        new KafkaStreams(builder.build(), props);
+
+        try {
+            new KafkaStreams(builder.build(), props);
+            fail("Should have throw ConfigException");
+        } catch (final ConfigException expected) { /* expected */ }
     }
 
     @Test
@@ -423,7 +428,7 @@ public class KafkaStreamsTest {
         streams.cleanUp();
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testCannotCleanupWhileRunning() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
@@ -441,9 +446,9 @@ public class KafkaStreamsTest {
         }, 10 * 1000, "Streams never started.");
         try {
             streams.cleanUp();
-        } catch (final IllegalStateException e) {
-            assertEquals("Cannot clean up while running.", e.getMessage());
-            throw e;
+            fail("Should have thrown IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            assertEquals("Cannot clean up while running.", expected.getMessage());
         } finally {
             streams.close();
         }


Mime
View raw message