kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4916: test streams with brokers failing
Date Thu, 06 Apr 2017 21:51:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 095099495 -> 51ae9c249


KAFKA-4916: test streams with brokers failing

Author: Eno Thereska <eno@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2793 from enothereska/KAFKA-4916-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51ae9c24
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51ae9c24
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51ae9c24

Branch: refs/heads/0.10.2
Commit: 51ae9c2491e90da88391467b8e25f6e98fd660dc
Parents: 0950994
Author: Eno Thereska <eno@confluent.io>
Authored: Thu Apr 6 14:51:05 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Apr 6 14:51:05 2017 -0700

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         |  32 ++-
 .../internals/StreamPartitionAssignor.java      |  12 +-
 .../processor/internals/StreamThread.java       |  65 +++---
 .../processor/internals/StreamsKafkaClient.java |  19 +-
 .../internals/InternalTopicManagerTest.java     |  19 +-
 .../kafka/streams/tests/SmokeTestClient.java    |  41 +++-
 .../kafka/streams/tests/SmokeTestDriver.java    |  20 +-
 .../kafka/test/MockInternalTopicManager.java    |   3 +-
 tests/kafkatest/services/streams.py             |   3 +-
 .../tests/streams/streams_bounce_test.py        |   4 +-
 .../tests/streams/streams_broker_bounce_test.py | 213 +++++++++++++++++++
 .../tests/streams/streams_smoke_test.py         |   4 +-
 12 files changed, 372 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 52536e1..a0bdb9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,16 +37,18 @@ public class InternalTopicManager {
     public static final String RETENTION_MS = "retention.ms";
     public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1,
TimeUnit.DAYS);
     private static final int MAX_TOPIC_READY_TRY = 5;
-
+    private final Time time;
     private final long windowChangeLogAdditionalRetention;
 
     private final int replicationFactor;
     private final StreamsKafkaClient streamsKafkaClient;
 
-    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
final long windowChangeLogAdditionalRetention) {
+    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
+                                final long windowChangeLogAdditionalRetention, final Time
time) {
         this.streamsKafkaClient = streamsKafkaClient;
         this.replicationFactor = replicationFactor;
         this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
+        this.time = time;
     }
 
     /**
@@ -61,11 +64,19 @@ public class InternalTopicManager {
                 final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
                 final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
                 final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics,
existingTopicPartitions);
+                if (metadata.brokers().size() < replicationFactor) {
+                    throw new StreamsException("Found only " + metadata.brokers().size()
+ " brokers, " +
+                        " but replication factor is " + replicationFactor + "." +
+                        " Decrease replication factor for internal topics via StreamsConfig
parameter \"replication.factor\""  +
+                        " or add more brokers to your cluster.");
+                }
                 streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention,
metadata);
                 return;
             } catch (StreamsException ex) {
                 log.warn("Could not create internal topics: " + ex.getMessage() + " Retry
#" + i);
             }
+            // backoff
+            time.sleep(100L);
         }
         throw new StreamsException("Could not create internal topics.");
     }
@@ -74,11 +85,20 @@ public class InternalTopicManager {
      * Get the number of partitions for the given topics
      */
     public Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
-        final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
-        existingTopicPartitions.keySet().retainAll(topics);
+        for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
+            try {
+                final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
+                existingTopicPartitions.keySet().retainAll(topics);
 
-        return existingTopicPartitions;
+                return existingTopicPartitions;
+            } catch (StreamsException ex) {
+                log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry
#" + i);
+            }
+            // backoff
+            time.sleep(100L);
+        }
+        throw new StreamsException("Could not get number of partitions.");
     }
 
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1ad6dbc..a50a819 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -58,6 +59,7 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager.
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+    private Time time = Time.SYSTEM;
 
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
@@ -163,6 +165,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private CopartitionedTopicsValidator copartitionedTopicsValidator;
 
     /**
+     * Package-private method to set the time. Used for tests.
+     * @param time Time to be used.
+     */
+    void time(final Time time) {
+        this.time = time;
+    }
+
+    /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
      * since the former needs later's cached metadata while sending subscriptions,
      * and the latter needs former's returned assignment when adding tasks.
@@ -211,7 +221,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer)
configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
                 configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
?
                         (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
-                        : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+                        : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
 
         this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0bf797f..a0ee9a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -436,17 +436,21 @@ public class StreamThread extends Thread {
 
     @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState() {
-        log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby
tasks {}", logPrefix,
-            activeTasks.keySet(), standbyTasks.keySet());
+        log.debug("{} shutdownTasksAndState: shutting down" +
+                "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby
tasks {}",
+            logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
+            suspendedTasks.keySet(), suspendedStandbyTasks.keySet());
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all processors in topology order
-        firstException.compareAndSet(null, closeAllTasks());
+        firstException.compareAndSet(null, closeTasks(activeAndStandbytasks()));
+        firstException.compareAndSet(null, closeTasks(suspendedAndSuspendedStandbytasks()));
         // flush state
         firstException.compareAndSet(null, flushAllState());
         // Close all task state managers. Don't need to set exception as all
         // state would have been flushed above
-        closeAllStateManagers(firstException.get() == null);
+        closeStateManagers(activeAndStandbytasks(), firstException.get() == null);
+        closeStateManagers(suspendedAndSuspendedStandbytasks(), firstException.get() == null);
         // only commit under clean exit
         if (cleanRun && firstException.get() == null) {
             firstException.set(commitOffsets());
@@ -465,7 +469,7 @@ public class StreamThread extends Thread {
             activeTasks.keySet(), standbyTasks.keySet());
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all topology nodes
-        firstException.compareAndSet(null, closeAllTasksTopologies());
+        firstException.compareAndSet(null, closeActiveAndStandbyTasksTopologies());
         // flush state
         firstException.compareAndSet(null, flushAllState());
         // only commit after all state has been flushed and there hasn't been an exception
@@ -488,21 +492,20 @@ public class StreamThread extends Thread {
         void apply(final AbstractTask task);
     }
 
-    private RuntimeException performOnAllTasks(final AbstractTaskAction action,
-                                   final String exceptionMessage) {
+    private RuntimeException performOnTasks(final List<AbstractTask> tasks,
+                                            final AbstractTaskAction action,
+                                            final String exceptionMessage) {
         RuntimeException firstException = null;
-        final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values());
-        allTasks.addAll(standbyTasks.values());
-        for (final AbstractTask task : allTasks) {
+        for (final AbstractTask task : tasks) {
             try {
                 action.apply(task);
             } catch (RuntimeException t) {
                 log.error("{} Failed while executing {} {} due to {}: ",
-                        StreamThread.this.logPrefix,
-                        task.getClass().getSimpleName(),
-                        task.id(),
-                        exceptionMessage,
-                        t);
+                    StreamThread.this.logPrefix,
+                    task.getClass().getSimpleName(),
+                    task.id(),
+                    exceptionMessage,
+                    t);
                 if (firstException == null) {
                     firstException = t;
                 }
@@ -511,8 +514,20 @@ public class StreamThread extends Thread {
         return firstException;
     }
 
-    private Throwable closeAllStateManagers(final boolean writeCheckpoint) {
-        return performOnAllTasks(new AbstractTaskAction() {
+    private List<AbstractTask> activeAndStandbytasks() {
+        final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values());
+        tasks.addAll(standbyTasks.values());
+        return tasks;
+    }
+
+    private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
+        final List<AbstractTask> tasks = new ArrayList<AbstractTask>(suspendedTasks.values());
+        tasks.addAll(suspendedStandbyTasks.values());
+        return tasks;
+    }
+
+    private Throwable closeStateManagers(final List<AbstractTask> tasks, final boolean
writeCheckpoint) {
+        return performOnTasks(tasks, new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix,
task.id());
@@ -523,7 +538,7 @@ public class StreamThread extends Thread {
 
     private RuntimeException commitOffsets() {
         // Exceptions should not prevent this call from going through all shutdown steps
-        return performOnAllTasks(new AbstractTaskAction() {
+        return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix,
task.id());
@@ -533,7 +548,7 @@ public class StreamThread extends Thread {
     }
 
     private RuntimeException flushAllState() {
-        return performOnAllTasks(new AbstractTaskAction() {
+        return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix,
task.id());
@@ -1053,22 +1068,22 @@ public class StreamThread extends Thread {
         standbyRecords.clear();
     }
 
-    private RuntimeException closeAllTasks() {
-        return performOnAllTasks(new AbstractTaskAction() {
+    private RuntimeException closeTasks(final List<AbstractTask> tasks) {
+        return performOnTasks(tasks, new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
-                log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id());
+                log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
                 task.close();
                 streamsMetrics.tasksClosedSensor.record();
             }
         }, "close");
     }
 
-    private RuntimeException closeAllTasksTopologies() {
-        return performOnAllTasks(new AbstractTaskAction() {
+    private RuntimeException closeActiveAndStandbyTasksTopologies() {
+        return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
-                log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix,
task.id());
+                log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id());
                 task.closeTopology();
                 streamsMetrics.tasksClosedSensor.record();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 60dd416..bb75759 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -198,7 +198,11 @@ public class StreamsKafkaClient {
                     break;
                 }
             }
-            kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            try {
+                kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            } catch (final Exception e) {
+                throw new StreamsException("Could not poll.", e);
+            }
         }
         if (brokerId == null) {
             throw new StreamsException("Could not find any available broker.");
@@ -230,11 +234,20 @@ public class StreamsKafkaClient {
     }
 
     private ClientResponse sendRequest(final ClientRequest clientRequest) {
-        kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+        try {
+            kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+        } catch (final Exception e) {
+            throw new StreamsException("Could not send request.", e);
+        }
         final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
         // Poll for the response.
         while (Time.SYSTEM.milliseconds() < responseTimeout) {
-            List<ClientResponse> responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG),
Time.SYSTEM.milliseconds());
+            final List<ClientResponse> responseList;
+            try {
+                responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG),
Time.SYSTEM.milliseconds());
+            } catch (final IllegalStateException e) {
+                throw new StreamsException("Could not poll.", e);
+            }
             if (!responseList.isEmpty()) {
                 if (responseList.size() > 1) {
                     throw new StreamsException("Sent one request but received multiple or
no responses.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 69b3c46..2169496 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -41,7 +43,7 @@ public class InternalTopicManagerTest {
     private final String topic = "test_topic";
     private final String userEndPoint = "localhost:2171";
     private MockStreamKafkaClient streamsKafkaClient;
-
+    private final Time time = new MockTime();
     @Before
     public void init() {
         final StreamsConfig config = new StreamsConfig(configProps());
@@ -55,19 +57,22 @@ public class InternalTopicManagerTest {
 
     @Test
     public void shouldReturnCorrectPartitionCounts() throws Exception {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
     }
 
     @Test
     public void shouldCreateRequiredTopics() throws Exception {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic,
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
     }
 
     @Test
     public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         boolean exceptionWasThrown = false;
         try {
             internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic,
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
@@ -105,9 +110,9 @@ public class InternalTopicManagerTest {
             Node node = new Node(1, "host1", 1001);
             MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, node, new ArrayList<Node>(), new ArrayList<Node>());
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
topic, true, Collections.singletonList(partitionMetadata));
-            MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(),
null, MetadataResponse.NO_CONTROLLER_ID,
-                Collections.singletonList(topicMetadata), 0);
+            MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node),
null, MetadataResponse.NO_CONTROLLER_ID,
+                Collections.singletonList(topicMetadata));
             return response;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 35f1883..d192126 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -40,6 +41,7 @@ public class SmokeTestClient extends SmokeTestUtil {
     private final File stateDir;
     private KafkaStreams streams;
     private Thread thread;
+    private boolean uncaughtException = false;
 
     public SmokeTestClient(File stateDir, String kafka) {
         super();
@@ -52,10 +54,19 @@ public class SmokeTestClient extends SmokeTestUtil {
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
+                uncaughtException = true;
                 e.printStackTrace();
             }
         });
 
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                close();
+            }
+        }));
+
         thread = new Thread() {
             public void run() {
                 streams.start();
@@ -65,32 +76,38 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close();
+        streams.close(5, TimeUnit.SECONDS);
+        // do not remove these printouts since they are needed for health scripts
+        if (!uncaughtException) {
+            System.out.println("SMOKE-TEST-CLIENT-CLOSED");
+        }
         try {
             thread.join();
         } catch (Exception ex) {
+            // do not remove these printouts since they are needed for health scripts
+            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
             // ignore
         }
     }
 
     private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
 
-        KStreamBuilder builder = new KStreamBuilder();
 
+        KStreamBuilder builder = new KStreamBuilder();
         KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
-
         source.to(stringSerde, intSerde, "echo");
-
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>()
{
             @Override
             public boolean test(String key, Integer value) {
@@ -211,7 +228,17 @@ public class SmokeTestClient extends SmokeTestUtil {
                     "cntByCnt"
         ).to(stringSerde, longSerde, "tagg");
 
-        return new KafkaStreams(builder, props);
+        final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("FATAL: An unexpected exception is encountered on thread
" + t + ": " + e);
+                
+                streamsClient.close(30, TimeUnit.SECONDS);
+            }
+        });
+
+        return streamsClient;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 6f29e00..c2cfd84 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -131,13 +131,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     public static Map<String, Set<Integer>> generate(String kafka, final int
numKeys, final int maxRecordsPerKey) throws Exception {
-        Properties props = new Properties();
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        // the next 4 config values make sure that all records are produced with no loss
and
+        // no duplicates
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
         int numRecordsProduced = 0;
 
@@ -232,7 +236,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
         int retry = 0;
         final long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(3)) {
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
             ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 28aca34..0813f39 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopicManager;
@@ -36,7 +37,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
     private MockConsumer<byte[], byte[]> restoreConsumer;
 
     public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[],
byte[]> restoreConsumer) {
-        super(new StreamsKafkaClient(streamsConfig), 0, 0);
+        super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime());
 
         this.restoreConsumer = restoreConsumer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a0fa412..8bc4d9e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -50,6 +50,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
                      'user_test_args': user_test_args}
+        self.log_level = "DEBUG"
 
     @property
     def node(self):
@@ -94,7 +95,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             self.logger.info("Restarting Kafka Streams on " + str(node.account))
             self.start_node(node)
 
-    def wait(self, timeout_sec=360):
+    def wait(self, timeout_sec=1440):
         for node in self.nodes:
             self.wait_node(node, timeout_sec)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/tests/kafkatest/tests/streams/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py
index 169bbc1..7ac7939 100644
--- a/tests/kafkatest/tests/streams/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_bounce_test.py
@@ -26,7 +26,7 @@ class StreamsBounceTest(KafkaTest):
     """
 
     def __init__(self, test_context):
-        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
             'echo' : { 'partitions': 5, 'replication-factor': 2 },
             'data' : { 'partitions': 5, 'replication-factor': 2 },
             'min' : { 'partitions': 5, 'replication-factor': 2 },
@@ -42,7 +42,7 @@ class StreamsBounceTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @cluster(num_nodes=5)
+    @cluster(num_nodes=6)
     def test_bounce(self):
         """
         Start a smoke test client, then abort (kill -9) and restart it a few times.

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
new file mode 100644
index 0000000..86c19f9
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.mark import matrix
+from ducktape.mark import parametrize, ignore
+from kafkatest.services.kafka import KafkaService
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+import time
+import signal
+from random import randint
+
+def broker_node(test, topic, broker_type):
+    """ Discover node of requested type. For leader type, discovers leader for our topic
and partition 0
+    """
+    if broker_type == "leader":
+        node = test.kafka.leader(topic, partition=0)
+    elif broker_type == "controller":
+        node = test.kafka.controller()
+    else:
+        raise Exception("Unexpected broker type %s." % (broker_type))
+
+    return node
+
+def signal_node(test, node, sig):
+    test.kafka.signal_node(node, sig)
+    
+def clean_shutdown(test, topic, broker_type):
+    """Discover broker node of requested type and shut it down cleanly.
+    """
+    node = broker_node(test, topic, broker_type)
+    signal_node(test, node, signal.SIGTERM)
+
+def hard_shutdown(test, topic, broker_type):
+    """Discover broker node of requested type and shut it down with a hard kill."""
+    node = broker_node(test, topic, broker_type)
+    signal_node(test, node, signal.SIGKILL)
+
+    
+failures = {
+    "clean_shutdown": clean_shutdown,
+    "hard_shutdown": hard_shutdown
+}
+        
+class StreamsBrokerBounceTest(Test):
+    """
+    Simple test of Kafka Streams with brokers failing
+    """
+
+    def __init__(self, test_context):
+        super(StreamsBrokerBounceTest, self).__init__(test_context)
+        self.replication = 3
+        self.partitions = 3
+        self.topics = {
+            'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2}},
+            'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} },
+            'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} },
+            'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} }
+        }
+
+    def fail_broker_type(self, failure_mode, broker_type):
+        # Pick a random topic and bounce it's leader
+        topic_index = randint(0, len(self.topics.keys()) - 1)
+        topic = self.topics.keys()[topic_index]
+        failures[failure_mode](self, topic, broker_type)
+
+    def fail_many_brokers(self, failure_mode, num_failures):
+        sig = signal.SIGTERM
+        if (failure_mode == "clean_shutdown"):
+            sig = signal.SIGTERM
+        else:
+            sig = signal.SIGKILL
+            
+        for num in range(0, num_failures - 1):
+            signal_node(self, self.kafka.nodes[num], sig)
+
+        
+    def setup_system(self):
+         # Setup phase
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+        
+        self.kafka = KafkaService(self.test_context, num_nodes=self.replication,
+                                  zk=self.zk, topics=self.topics)
+        self.kafka.start()
+        # Start test harness
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+
+        
+        self.driver.start()
+        self.processor1.start()
+
+    def collect_results(self, sleep_time_secs):
+        data = {}
+        # End test
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
+
+        node = self.driver.node
+        
+        # Success is declared if streams does not crash when sleep time > 0
+        # It should give an exception when sleep time is 0 since we kill the brokers immediately
+        # and the topic manager cannot create internal topics with the desired replication
factor
+        if (sleep_time_secs == 0):
+            output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION
%s" % self.processor1.STDOUT_FILE, allow_fail=False)
+        else:
+            output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED
%s" % self.processor1.STDOUT_FILE, allow_fail=False)
+            
+        for line in output_streams:
+            data["Client closed"] = line
+
+        # Currently it is hard to guarantee anything about Kafka since we don't have exactly
once.
+        # With exactly once in place, success will be defined as ALL-RECORDS-DELIEVERD and
SUCCESS
+        output = node.account.ssh_capture("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED|PROCESSED-LESS-THAN-GENERATED'
%s" % self.driver.STDOUT_FILE, allow_fail=False)
+        for line in output:
+            data["Records Delivered"] = line
+        output = node.account.ssh_capture("grep -E 'SUCCESS|FAILURE' %s" % self.driver.STDOUT_FILE,
allow_fail=False)
+        for line in output:
+            data["Logic Success/Failure"] = line
+            
+        
+        return data
+
+    @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
+            broker_type=["leader", "controller"],
+            sleep_time_secs=[120])
+    def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs):
+        """
+        Start a smoke test client, then kill one particular broker and ensure data is still
received
+        Record if records are delivered. 
+        """
+        self.setup_system() 
+
+        # Sleep to allow test to run for a bit
+        time.sleep(sleep_time_secs)
+
+        # Fail brokers
+        self.fail_broker_type(failure_mode, broker_type);
+
+        return self.collect_results(sleep_time_secs)
+
+    @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown"],
+            broker_type=["controller"],
+            sleep_time_secs=[0])
+    def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs):
+        """
+        Start a smoke test client, then kill one particular broker immediately before streams
stats
+        Streams should throw an exception since it cannot create topics with the desired
+        replication factor of 3
+        """
+        self.setup_system() 
+
+        # Sleep to allow test to run for a bit
+        time.sleep(sleep_time_secs)
+
+        # Fail brokers
+        self.fail_broker_type(failure_mode, broker_type);
+
+        return self.collect_results(sleep_time_secs)
+    
+    @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
+            num_failures=[2])
+    def test_many_brokers_bounce(self, failure_mode, num_failures):
+        """
+        Start a smoke test client, then kill a few brokers and ensure data is still received
+        Record if records are delivered
+        """
+        self.setup_system() 
+
+        # Sleep to allow test to run for a bit
+        time.sleep(120)
+
+        # Fail brokers
+        self.fail_many_brokers(failure_mode, num_failures);
+
+        return self.collect_results(120)

http://git-wip-us.apache.org/repos/asf/kafka/blob/51ae9c24/tests/kafkatest/tests/streams/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index a824d92..496c495 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -27,7 +27,7 @@ class StreamsSmokeTest(KafkaTest):
     """
 
     def __init__(self, test_context):
-        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
             'echo' : { 'partitions': 5, 'replication-factor': 1 },
             'data' : { 'partitions': 5, 'replication-factor': 1 },
             'min' : { 'partitions': 5, 'replication-factor': 1 },
@@ -46,7 +46,7 @@ class StreamsSmokeTest(KafkaTest):
         self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
         self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @cluster(num_nodes=8)
+    @cluster(num_nodes=9)
     def test_streams(self):
         """
         Start a few smoke test clients, then repeat start a new one, stop (cleanly) running
one a few times.


Mime
View raw message