kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Broker down for significant amt of time system test
Date Tue, 19 Dec 2017 23:37:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 88c2b6849 -> f3b9afe62


MINOR: Broker down for significant amt of time system test

System test where a broker is offline more than the configured timeouts.  In this case:
- Max poll interval set to 45 secs
- Retries set to 2
- Request timeout set to 15 seconds
- Max block ms set to 30 seconds

The broker was taken off-line for 70 seconds or more than double request timeout * num retries

[passing system test results](http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2017-12-11--001.1513034559--bbejeck--KSTREAMS_1179_broker_down_for_significant_amt_of_time--6ab4802/report.html)

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4313 from bbejeck/KSTREAMS_1179_broker_down_for_significant_amt_of_time


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3b9afe6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3b9afe6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3b9afe6

Branch: refs/heads/trunk
Commit: f3b9afe62265d7559ef65f5aa692feecc5fa8f25
Parents: 88c2b68
Author: Bill Bejeck <bill@confluent.io>
Authored: Tue Dec 19 15:37:21 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 19 15:37:21 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |   7 +
 .../tests/StreamsBrokerDownResilienceTest.java  | 139 +++++++++++++++++++
 tests/kafkatest/services/streams.py             |   7 +
 .../streams_broker_down_resilience_test.py      | 114 +++++++++++++++
 4 files changed, 267 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f3b9afe6/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ecc8409..21c9759 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -76,6 +76,13 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
  * }</pre>
  *
+ * When increasing both {@link ProducerConfig#RETRIES_CONFIG} and {@link ProducerConfig#MAX_BLOCK_MS_CONFIG}
to be more resilient to non-available brokers you should also
+ * consider increasing {@link ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG}  using the following
guidance:
+ * <pre>
+ *     max.poll.interval.ms > min ( max.block.ms, (retries +1) * request.timeout.ms )
+ * </pre>
+ *
+ *
  * Kafka Streams requires at least the following properties to be set:
  * <ul>
  *  <li>{@link #APPLICATION_ID_CONFIG "application.id"}</li>

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3b9afe6/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
new file mode 100644
index 0000000..c8462ca
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class StreamsBrokerDownResilienceTest {
+
+    private static final int KEY = 0;
+    private static final int VALUE = 1;
+
+    private static final String SOURCE_TOPIC_1 = "streamsResilienceSource";
+
+    private static final String SINK_TOPIC = "streamsResilienceSink";
+
+    public static void main(String[] args) {
+
+        System.out.println("StreamsTest instance started");
+
+        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
+        final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+        final String additionalConfigs = args.length > 2 ? args[2] : null;
+
+        final Serde<String> stringSerde = Serdes.String();
+
+        final Properties streamsProperties = new Properties();
+        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience");
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+
+
+        // it is expected that max.poll.interval, retries, request.timeout and max.block.ms
set
+        // streams_broker_down_resilience_test and passed as args
+        if (additionalConfigs != null && !additionalConfigs.equalsIgnoreCase("none"))
{
+            Map<String, String> updated = updatedConfigs(additionalConfigs);
+            System.out.println("Updating configs with " + updated);
+            streamsProperties.putAll(updated);
+        }
+
+        if (!confirmCorrectConfigs(streamsProperties)) {
+            System.err.println(String.format("ERROR: Did not have all required configs expected
 to contain %s %s %s %s",
+                                             StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
+                                             StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG),
+                                             StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                                             StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)));
+
+            System.exit(1);
+        }
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde,
stringSerde))
+            .peek(new ForeachAction<String, String>() {
+                @Override
+                public void apply(String key, String value) {
+                    System.out.println("received key " + key + " and value " + value);
+                }
+            }).to(SINK_TOPIC);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
+
+        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(final Thread t, final Throwable e) {
+                System.err.println("FATAL: An unexpected exception " + e);
+                System.err.flush();
+                streams.close(30, TimeUnit.SECONDS);
+            }
+        });
+        System.out.println("Start Kafka Streams");
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                System.out.println("Shutting down streams now");
+                streams.close(10, TimeUnit.SECONDS);
+            }
+        }));
+
+
+    }
+
+    private static boolean confirmCorrectConfigs(Properties properties) {
+        return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG))
&&
+               properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG))
&&
+               properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG))
&&
+               properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG));
+    }
+
+    /**
+     * Takes a string with keys and values separated by '=' and each key value pair
+     * separated by ',' for example max.block.ms=5000,retries=6,request.timeout.ms=6000
+     *
+     * @param formattedConfigs the formatted config string
+     * @return HashMap with keys and values inserted
+     */
+    private static Map<String, String> updatedConfigs(String formattedConfigs) {
+        String[] parts = formattedConfigs.split(",");
+        Map<String, String> updatedConfigs = new HashMap<>();
+        for (String part : parts) {
+            String[] keyValue = part.split("=");
+            updatedConfigs.put(keyValue[KEY], keyValue[VALUE]);
+        }
+        return updatedConfigs;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3b9afe6/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 3719feb..9c4bd87 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -207,3 +207,10 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
                                                                 kafka,
                                                                 "org.apache.kafka.streams.tests.BrokerCompatibilityTest",
                                                                 eosEnabled)
+
+class StreamsBrokerDownResilienceService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka, configs):
+        super(StreamsBrokerDownResilienceService, self).__init__(test_context,
+                                                                 kafka,
+                                                                 "org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest",
+                                                                 configs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3b9afe6/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
new file mode 100644
index 0000000..bd90d9f
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsBrokerDownResilienceService
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class StreamsBrokerDownResilience(Test):
+    """
+    This test validates that Streams is resilient to a broker
+    being down longer than specified timeouts in configs
+    """
+
+    inputTopic = "streamsResilienceSource"
+    outputTopic = "streamsResilienceSink"
+    num_messages = 5
+
+    def __init__(self, test_context):
+        super(StreamsBrokerDownResilience, self).__init__(test_context=test_context)
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context,
+                                  num_nodes=1,
+                                  zk=self.zk,
+                                  topics={
+                                      self.inputTopic: {'partitions': 1, 'replication-factor':
1},
+                                      self.outputTopic: {'partitions': 1, 'replication-factor':
1}
+                                  })
+
+    def get_consumer(self):
+        return VerifiableConsumer(self.test_context,
+                                  1,
+                                  self.kafka,
+                                  self.outputTopic,
+                                  "stream-broker-resilience-verify-consumer",
+                                  max_messages=self.num_messages)
+
+    def get_producer(self):
+        return VerifiableProducer(self.test_context,
+                                  1,
+                                  self.kafka,
+                                  self.inputTopic,
+                                  max_messages=self.num_messages,
+                                  acks=1)
+
+    def assert_produce_consume(self, test_state):
+        producer = self.get_producer()
+        producer.start()
+
+        wait_until(lambda: producer.num_acked > 0,
+                   timeout_sec=30,
+                   err_msg="At %s failed to send messages " % test_state)
+
+        consumer = self.get_consumer()
+        consumer.start()
+
+        wait_until(lambda: consumer.total_consumed() > 0,
+                   timeout_sec=60,
+                   err_msg="At %s streams did not process messages in 60 seconds " % test_state)
+
+    def setUp(self):
+        self.zk.start()
+
+    def test_streams_resilient_to_broker_down(self):
+        self.kafka.start()
+
+        # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
+        consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
+        retries_config = "producer.retries=2"
+        request_timeout = "producer.request.timeout.ms=15000"
+        max_block_ms = "producer.max.block.ms=30000"
+
+        # Broker should be down over 2x of retries * timeout ms
+        # So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds
+        broker_down_time_in_seconds = 70
+
+        # java code expects configs in key=value,key=value format
+        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout
+ "," + max_block_ms
+
+        processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, updated_configs)
+        processor.start()
+
+        # until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running
before taking the broker down
+        # After KIP-91 is merged we'll continue to send messages the duration of the test
+        self.assert_produce_consume("before_broker_stop")
+
+        node = self.kafka.leader(self.inputTopic)
+
+        self.kafka.stop_node(node)
+
+        time.sleep(broker_down_time_in_seconds)
+
+        self.kafka.start_node(node)
+
+        self.assert_produce_consume("after_broker_stop")
+
+        self.kafka.stop()


Mime
View raw message