kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5309: Stores not queryable after one thread died
Date Fri, 26 May 2017 16:42:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 5dd2a49e6 -> 1b15adde1


KAFKA-5309: Stores not queryable after one thread died

 - introduces a new thread state DEAD
 - ignores DEAD threads when querying

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #3140 from mjsax/kafka-5309-stores-not-queryable

(cherry picked from commit faa1803aa35871e5e040a22b7fcec61a2be16e24)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b15adde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b15adde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b15adde

Branch: refs/heads/0.11.0
Commit: 1b15adde1d393e5d2bbfe75db46880ce9efcd1b2
Parents: 5dd2a49
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri May 26 09:42:02 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 26 09:42:11 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |   6 +-
 .../processor/internals/StreamThread.java       |  59 ++++---
 .../StreamThreadStateStoreProvider.java         |   4 +
 .../QueryableStateIntegrationTest.java          | 164 ++++++++++++++-----
 .../processor/internals/StreamThreadTest.java   |   2 +-
 5 files changed, 166 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1b15adde/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3b801a9..6da22ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -265,7 +265,11 @@ public class KafkaStreams {
         public synchronized void onChange(final StreamThread thread,
                                           final StreamThread.State newState,
                                           final StreamThread.State oldState) {
-            threadState.put(thread.getId(), newState);
+            if (newState != StreamThread.State.DEAD) {
+                threadState.put(thread.getId(), newState);
+            } else {
+                threadState.remove(thread.getId());
+            }
             if (newState == StreamThread.State.PARTITIONS_REVOKED ||
                 newState == StreamThread.State.ASSIGNING_PARTITIONS) {
                 setState(State.REBALANCING);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b15adde/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f16e323..1e73c89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -81,35 +81,40 @@ public class StreamThread extends Thread {
      *
      * <pre>
      *                +-------------+
-     *                | Not Running | <-------+
-     *                +-----+-------+         |
-     *                      |                 |
-     *                      v                 |
-     *                +-----+-------+         |
-     *          +<--- | Running     | <----+  |
-     *          |     +-----+-------+      |  |
-     *          |           |              |  |
-     *          |           v              |  |
-     *          |     +-----+-------+      |  |
-     *          +<--- | Partitions  |      |  |
-     *          |     | Revoked     |      |  |
-     *          |     +-----+-------+      |  |
-     *          |           |              |  |
-     *          |           v              |  |
-     *          |     +-----+-------+      |  |
-     *          |     | Assigning   |      |  |
-     *          |     | Partitions  | ---->+  |
-     *          |     +-----+-------+         |
-     *          |           |                 |
-     *          |           v                 |
-     *          |     +-----+-------+         |
-     *          +---> | Pending     | ------->+
+     *                | Created     |
+     *                +-----+-------+
+     *                      |
+     *                      v
+     *                +-----+-------+
+     *          +<--- | Running     | <----+
+     *          |     +-----+-------+      |
+     *          |           |              |
+     *          |           v              |
+     *          |     +-----+-------+      |
+     *          +<--- | Partitions  |      |
+     *          |     | Revoked     |      |
+     *          |     +-----+-------+      |
+     *          |           |              |
+     *          |           v              |
+     *          |     +-----+-------+      |
+     *          |     | Assigning   |      |
+     *          |     | Partitions  | ---->+
+     *          |     +-----+-------+
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +---> | Pending     |
      *                | Shutdown    |
+     *                +-----+-------+
+     *                      |
+     *                      v
+     *                +-----+-------+
+     *                | Dead        |
      *                +-------------+
      * </pre>
      */
     public enum State {
-        NOT_RUNNING(1), RUNNING(1, 2, 4), PARTITIONS_REVOKED(3, 4), ASSIGNING_PARTITIONS(1,
4), PENDING_SHUTDOWN(0);
+        CREATED(1), RUNNING(1, 2, 4), PARTITIONS_REVOKED(3, 4), ASSIGNING_PARTITIONS(1, 4),
PENDING_SHUTDOWN(5), DEAD;
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -118,7 +123,7 @@ public class StreamThread extends Thread {
         }
 
         public boolean isRunning() {
-            return !equals(PENDING_SHUTDOWN) && !equals(NOT_RUNNING);
+            return !equals(PENDING_SHUTDOWN) && !equals(CREATED) && !equals(DEAD);
         }
 
         public boolean isValidTransition(final State newState) {
@@ -377,7 +382,7 @@ public class StreamThread extends Thread {
     }
 
 
-    private volatile State state = State.NOT_RUNNING;
+    private volatile State state = State.CREATED;
     private StreamThread.StateListener stateListener = null;
     final PartitionGrouper partitionGrouper;
     private final StreamsMetadataState streamsMetadataState;
@@ -1062,7 +1067,7 @@ public class StreamThread extends Thread {
         // clean up global tasks
 
         log.info("{} Stream thread shutdown complete", logPrefix);
-        setState(State.NOT_RUNNING);
+        setState(State.DEAD);
         streamsMetrics.removeAllSensors();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b15adde/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 2d7ff82..45d9898 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -40,6 +41,9 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider
{
     @SuppressWarnings("unchecked")
     @Override
     public <T> List<T> stores(final String storeName, final QueryableStoreType<T>
queryableStoreType) {
+        if (streamThread.state() == StreamThread.State.DEAD) {
+            return Collections.emptyList();
+        }
         if (!streamThread.isInitialized()) {
             throw new InvalidStateStoreException("the state store, " + storeName + ", may
have migrated to another instance.");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b15adde/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 509a7fd..d5cea24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -70,6 +71,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
@@ -81,7 +83,6 @@ import static org.junit.Assert.fail;
 @Category({IntegrationTest.class})
 public class QueryableStateIntegrationTest {
     private static final int NUM_BROKERS = 1;
-    private static final long COMMIT_INTERVAL_MS = 300L;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
@@ -129,10 +130,8 @@ public class QueryableStateIntegrationTest {
         final String applicationId = "queryable-state-" + testNo;
 
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@@ -191,11 +190,6 @@ public class QueryableStateIntegrationTest {
 
     /**
      * Creates a typical word count topology
-     *
-     * @param inputTopic
-     * @param outputTopic
-     * @param streamsConfiguration config
-     * @return
      */
     private KafkaStreams createCountStream(final String inputTopic, final String outputTopic,
final Properties streamsConfiguration) {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -223,7 +217,7 @@ public class QueryableStateIntegrationTest {
     private class StreamRunnable implements Runnable {
         private final KafkaStreams myStream;
         private boolean closed = false;
-        private KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
+        private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
 
         StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort)
{
             final Properties props = (Properties) streamsConfiguration.clone();
@@ -253,7 +247,7 @@ public class QueryableStateIntegrationTest {
             return myStream;
         }
 
-        public final KafkaStreamsTest.StateListenerStub getStateListener() {
+        final KafkaStreamsTest.StateListenerStub getStateListener() {
             return stateListener;
         }
     }
@@ -309,7 +303,7 @@ public class QueryableStateIntegrationTest {
                     } catch (final IllegalStateException e) {
                         // Kafka Streams instance may have closed but rebalance hasn't happened
                         return false;
-                    } catch (InvalidStateStoreException e) {
+                    } catch (final InvalidStateStoreException e) {
                         // there must have been at least one rebalance state
                         assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
                         return false;
@@ -423,8 +417,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryFilterState() throws Exception {
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
@@ -449,7 +443,7 @@ public class QueryableStateIntegrationTest {
             mockTime);
         final Predicate<String, Long> filterPredicate = new Predicate<String, Long>()
{
             @Override
-            public boolean test(String key, Long value) {
+            public boolean test(final String key, final Long value) {
                 return key.contains("kafka");
             }
         };
@@ -489,8 +483,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryMapValuesState() throws Exception {
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>();
@@ -514,7 +508,7 @@ public class QueryableStateIntegrationTest {
         final KTable<String, String> t1 = builder.table(streamOne);
         final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>()
{
             @Override
-            public Long apply(String value) {
+            public Long apply(final String value) {
                 return Long.valueOf(value);
             }
         }, Serdes.Long(), "queryMapValues");
@@ -535,8 +529,8 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
         final Set<KeyValue<String, String>> batch1 = new HashSet<>();
@@ -562,7 +556,7 @@ public class QueryableStateIntegrationTest {
 
         final Predicate<String, String> filterPredicate = new Predicate<String,
String>() {
             @Override
-            public boolean test(String key, String value) {
+            public boolean test(final String key, final String value) {
                 return key.contains("kafka");
             }
         };
@@ -570,7 +564,7 @@ public class QueryableStateIntegrationTest {
         final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
         final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>()
{
             @Override
-            public Long apply(String value) {
+            public Long apply(final String value) {
                 return Long.valueOf(value);
             }
         }, Serdes.Long(), "queryMapValues");
@@ -595,7 +589,7 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-    private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException,
InterruptedException {
+    private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException,
InterruptedException {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
@@ -671,17 +665,7 @@ public class QueryableStateIntegrationTest {
                 mockTime);
 
         final int maxWaitMs = 30000;
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                try {
-                    kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-                    return true;
-                } catch (InvalidStateStoreException ise) {
-                    return false;
-                }
-            }
-        }, maxWaitMs, "waiting for store " + storeName);
+        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store
" + storeName);
 
         final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, Long>keyValueStore());
 
@@ -706,7 +690,7 @@ public class QueryableStateIntegrationTest {
                 try {
                     assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore()).get("hello"));
                     return true;
-                } catch (InvalidStateStoreException ise) {
+                } catch (final InvalidStateStoreException ise) {
                     return false;
                 }
             }
@@ -714,6 +698,107 @@ public class QueryableStateIntegrationTest {
 
     }
 
+    private class WaitForStore implements TestCondition {
+        private final String storeName;
+
+        WaitForStore(final String storeName) {
+            this.storeName = storeName;
+        }
+        @Override
+        public boolean conditionMet() {
+            try {
+                kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+                return true;
+            } catch (final InvalidStateStoreException ise) {
+                return false;
+            }
+        }
+    }
+
+    @Test
+    public void shouldAllowToQueryAfterThreadDied() throws Exception {
+        final AtomicBoolean beforeFailure = new AtomicBoolean(true);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        final String storeName = "store";
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> input = builder.stream(streamOne);
+        input
+            .groupByKey()
+            .reduce(new Reducer<String>() {
+                @Override
+                public String apply(final String value1, final String value2) {
+                    if (beforeFailure.get() && value1.length() > 1) {
+                        beforeFailure.set(false);
+                        throw new RuntimeException("Injected test exception");
+                    }
+                    return value1 + value2;
+                }
+            }, storeName)
+            .to(outputTopic);
+
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(final Thread t, final Throwable e) {
+                failed.set(true);
+            }
+        });
+        kafkaStreams.start();
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"), KeyValue.pair("b",
"3"), KeyValue.pair("b", "4")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        final int maxWaitMs = 30000;
+        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store
" + storeName);
+
+        final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, String>keyValueStore());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return "12".equals(store.get("a")) && "34".equals(store.get("b"));
+            }
+        }, maxWaitMs, "wait for agg to be '123'");
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOne,
+            Arrays.asList(KeyValue.pair("a", "5")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return failed.get();
+            }
+        }, 30000, "wait for thread to fail");
+
+        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store
" + storeName);
+
+        final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, String>keyValueStore());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
+            }
+        }, maxWaitMs, "wait for agg to be '123'");
+
+    }
+
     private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
                                    final ReadOnlyKeyValueStore<String, Long> myCount)
{
         final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
@@ -783,7 +868,6 @@ public class QueryableStateIntegrationTest {
      * @param failIfKeyNotFound     if true, tests fails if an expected key is not found
in store. If false,
      *                              the method merely inserts the new found key into the
list of
      *                              expected keys.
-     * @throws InterruptedException
      */
     private void verifyGreaterOrEqual(final String[] keys,
                                       final Map<String, Long> expectedWindowedCount,
@@ -888,11 +972,11 @@ public class QueryableStateIntegrationTest {
             currIteration++;
         }
 
-        public synchronized int getCurrIteration() {
+        synchronized int getCurrIteration() {
             return currIteration;
         }
 
-        public synchronized void shutdown() {
+        synchronized void shutdown() {
             shutdown = true;
         }
 
@@ -909,8 +993,8 @@ public class QueryableStateIntegrationTest {
                          new KafkaProducer<>(producerConfig, new StringSerializer(),
new StringSerializer())) {
 
                 while (getCurrIteration() < numIterations && !shutdown) {
-                    for (int i = 0; i < inputValues.size(); i++) {
-                        producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i)));
+                    for (final String value : inputValues) {
+                        producer.send(new ProducerRecord<String, String>(topic, value));
                     }
                     incrementInteration();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b15adde/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e5b96ff..e255350 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -363,7 +363,7 @@ public class StreamThreadTest {
 
         thread.close();
         assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) ||
-            (thread.state() == StreamThread.State.NOT_RUNNING));
+            (thread.state() == StreamThread.State.CREATED));
     }
 
     private final static String TOPIC = "topic";


Mime
View raw message