kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2803: Add hard bounce system test for Kafka Connect.
Date Wed, 25 Nov 2015 01:39:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 395bf46dd -> d1053915f


KAFKA-2803: Add hard bounce system test for Kafka Connect.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira

Closes #494 from ewencp/kafka-2803-connect-hard-bounce-system-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1053915
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1053915
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1053915

Branch: refs/heads/trunk
Commit: d1053915f64aec7ea717bbeac9570b1f75e9a2b0
Parents: 395bf46
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Nov 24 17:39:14 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Nov 24 17:39:14 2015 -0800

----------------------------------------------------------------------
 .../kafkatest/tests/connect_distributed_test.py | 43 +++++++++++++-------
 .../templates/connect-distributed.properties    |  7 +++-
 2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1053915/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
index 4689f36..1f82e63 100644
--- a/tests/kafkatest/tests/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect_distributed_test.py
@@ -17,7 +17,8 @@ from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
 from kafkatest.services.console_consumer import ConsoleConsumer
 from ducktape.utils.util import wait_until
-import hashlib, subprocess, json, itertools, time
+from ducktape.mark import matrix
+import subprocess, itertools, time
 from collections import Counter
 
 class ConnectDistributedTest(KafkaTest):
@@ -84,7 +85,8 @@ class ConnectDistributedTest(KafkaTest):
         wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST),
timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
 
 
-    def test_clean_bounce(self):
+    @matrix(clean=[True, False])
+    def test_bounce(self, clean):
         """
         Validates that source and sink tasks that run continuously and produce a predictable
sequence of messages
         run correctly and deliver messages exactly once when Kafka Connect workers undergo
clean rolling bounces.
@@ -102,13 +104,19 @@ class ConnectDistributedTest(KafkaTest):
         for _ in range(3):
             for node in self.cc.nodes:
                 started = time.time()
-                self.logger.info("Cleanly bouncing Kafka Connect on " + str(node.account))
-                self.cc.stop_node(node)
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or
"Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean)
                 with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
                     self.cc.start_node(node)
                     monitor.wait_until("Starting connectors and tasks using config offset",
timeout_sec=90,
                                        err_msg="Kafka Connect worker didn't successfully
join group and start work")
                 self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds",
node.account, time.time() - started)
+                # If this is a hard bounce, give additional time for the consumer groups
to recover. If we don't give
+                # some time here, the next bounce may cause consumers to be shut down before
they have any time to process
+                # data and we can end up with zero data making it through the test.
+                if not clean:
+                    time.sleep(15)
+
 
         self.source.stop()
         self.sink.stop()
@@ -118,12 +126,14 @@ class ConnectDistributedTest(KafkaTest):
         # cleanly exited. Currently this only tests at least once delivery because the sink
task may not have consumed
         # all the messages generated by the source task. This needs to be done per-task since
seqnos are not unique across
         # tasks.
-        src_msgs = self.source.messages()
-        sink_msgs = self.sink.messages()
         success = True
         errors = []
+        allow_dups = not clean
+        src_messages = self.source.messages()
+        sink_messages = self.sink.messages()
         for task in range(num_tasks):
-            src_seqnos = [msg['seqno'] for msg in src_msgs if msg['task'] == task]
+            # Validate source messages
+            src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task]
             # Every seqno up to the largest one we ever saw should appear. Each seqno should
only appear once because clean
             # bouncing should commit on rebalance.
             src_seqno_max = max(src_seqnos)
@@ -136,12 +146,14 @@ class ConnectDistributedTest(KafkaTest):
                 self.logger.error("Missing source sequence numbers for task " + str(task))
                 errors.append("Found missing source sequence numbers for task %d: %s" % (task,
missing_src_seqnos))
                 success = False
-            if duplicate_src_seqnos:
+            if not allow_dups and duplicate_src_seqnos:
                 self.logger.error("Duplicate source sequence numbers for task " + str(task))
                 errors.append("Found duplicate source sequence numbers for task %d: %s" %
(task, duplicate_src_seqnos))
                 success = False
 
-            sink_seqnos = [msg['seqno'] for msg in sink_msgs if msg['task'] == task and 'flushed'
in msg]
+
+            # Validate sink messages
+            sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task and
'flushed' in msg]
             # Every seqno up to the largest one we ever saw should appear. Each seqno should
only appear once because
             # clean bouncing should commit on rebalance.
             sink_seqno_max = max(sink_seqnos)
@@ -154,17 +166,16 @@ class ConnectDistributedTest(KafkaTest):
                 self.logger.error("Missing sink sequence numbers for task " + str(task))
                 errors.append("Found missing sink sequence numbers for task %d: %s" % (task,
missing_sink_seqnos))
                 success = False
-            if duplicate_sink_seqnos:
-               self.logger.error("Duplicate sink sequence numbers for task " + str(task))
-               errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task,
duplicate_sink_seqnos))
-               success = False
-
+            if not allow_dups and duplicate_sink_seqnos:
+                self.logger.error("Duplicate sink sequence numbers for task " + str(task))
+                errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task,
duplicate_sink_seqnos))
+                success = False
 
+            # Validate source and sink match
             if sink_seqno_max > src_seqno_max:
                 self.logger.error("Found sink sequence number greater than any generated
sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max)
                 errors.append("Found sink sequence number greater than any generated sink
sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max))
                 success = False
-
             if src_seqno_max < 1000 or sink_seqno_max < 1000:
                 errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max,
sink_seqno_max))
                 success = False
@@ -175,9 +186,11 @@ class ConnectDistributedTest(KafkaTest):
             consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic,
consumer_timeout_ms=1000, print_key=True)
             consumer_validator.run()
             self.mark_for_collect(consumer_validator, "consumer_stdout")
+
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
 
+
     def _validate_file_output(self, input):
         input_set = set(input)
         # Output needs to be collected from all nodes because we can't be sure where the
tasks will be scheduled.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1053915/tests/kafkatest/tests/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties
index 4a61b92..8a9f6c7 100644
--- a/tests/kafkatest/tests/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/templates/connect-distributed.properties
@@ -37,4 +37,9 @@ config.storage.topic={{ CONFIG_TOPIC }}
 # Make sure data gets flushed frequently so tests don't have to wait to ensure they see data
in output systems
 offset.flush.interval.ms=5000
 
-rest.advertised.host.name = {{ node.account.hostname }}
\ No newline at end of file
+rest.advertised.host.name = {{ node.account.hostname }}
+
+
+# Reduce session timeouts so tests that kill workers don't need to wait as long to recover
+session.timeout.ms=10000
+consumer.session.timeout.ms=10000
\ No newline at end of file


Mime
View raw message