kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3782: Fix transient failure in connect distributed bounce test
Date Fri, 22 Jul 2016 03:09:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f1b37eec7 -> f5df13627


KAFKA-3782: Fix transient failure in connect distributed bounce test

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1650 from hachikuji/KAFKA-3782


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5df1362
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5df1362
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5df1362

Branch: refs/heads/trunk
Commit: f5df13627aaa6052a19e4cd7896e94730dac7f64
Parents: f1b37ee
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Jul 21 20:09:03 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Jul 21 20:09:03 2016 -0700

----------------------------------------------------------------------
 .../tests/connect/connect_distributed_test.py        | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f5df1362/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index d4c4225..b9757ba 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -329,7 +329,7 @@ class ConnectDistributedTest(Test):
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
 
-        self.source = VerifiableSource(self.cc, tasks=num_tasks)
+        self.source = VerifiableSource(self.cc, tasks=num_tasks, throughput=100)
         self.source.start()
         self.sink = VerifiableSink(self.cc, tasks=num_tasks)
         self.sink.start()
@@ -344,11 +344,14 @@ class ConnectDistributedTest(Test):
                     monitor.wait_until("Starting connectors and tasks using config offset",
timeout_sec=90,
                                        err_msg="Kafka Connect worker didn't successfully
join group and start work")
                 self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds",
node.account, time.time() - started)
-                # If this is a hard bounce, give additional time for the consumer groups
to recover. If we don't give
-                # some time here, the next bounce may cause consumers to be shut down before
they have any time to process
-                # data and we can end up with zero data making it through the test.
-                if not clean:
-                    time.sleep(15)
+
+                # Give additional time for the consumer groups to recover. Even if it is
not a hard bounce, there are
+                # some cases where a restart can cause a rebalance to take the full length
of the session timeout
+                # (e.g. if the client shuts down before it has received the memberId from
its initial JoinGroup).
+                # If we don't give enough time for the group to stabilize, the next bounce
may cause consumers to 
+                # be shut down before they have any time to process data and we can end up
with zero data making it 
+                # through the test.
+                time.sleep(15)
 
 
         self.source.stop()


Mime
View raw message