kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX
Date Tue, 18 Jul 2017 04:25:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 fab75162e -> 40e36fccb


MINOR: Do not wait for first line of console consumer output since we now have a more reliable
test using JMX

Waiting for the first line of output was added in KAFKA-2527 when JmxMixin was originally
added as a heuristic to
determine when the process was ready. We've since determined this is not good enough given
JmxTool's limitations
and now include a separate, more reliable check before starting JmxTool. This check is also
dangerous since a
consumer that is started before data is available in the topic, it won't output anything to
stdout and only logs
errors to a separate log file. This means we may have a long delay between starting the process
and starting JMX
monitoring.

Since we have a more reliable check for liveness via JMX now (and in cases that need it, partition
assignment
metrics via JMX), we should no longer need to wait for the first line of output.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva@confluent.io>

Closes #3447 from ewencp/dont-wait-first-line-console-consumer

(cherry picked from commit d663005fddcebafba439473299dc4c6ad74c966c)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40e36fcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40e36fcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40e36fcc

Branch: refs/heads/0.10.2
Commit: 40e36fccb6d6075e206347489391e46d505d6004
Parents: fab7516
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Mon Jul 17 21:24:45 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jul 17 21:25:12 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py | 40 +++++++++++------------
 1 file changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/40e36fcc/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index d55d012..53ed357 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -250,27 +250,25 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-        first_line = next(consumer_output, None)
-
-        if first_line is not None:
-            self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
-            self.init_jmx_attributes()
-            self.start_jmx_tool(idx, node)
-
-            for line in itertools.chain([first_line], consumer_output):
-                msg = line.strip()
-                if msg == "shutdown_complete":
-                    # Note that we can only rely on shutdown_complete message if running
0.10.0 or greater
-                    if node in self.clean_shutdown_nodes:
-                        raise Exception("Unexpected shutdown event from consumer, already
shutdown. Consumer index: %d" % idx)
-                    self.clean_shutdown_nodes.add(node)
-                else:
-                    if self.message_validator is not None:
-                        msg = self.message_validator(msg)
-                    if msg is not None:
-                        self.messages_consumed[idx].append(msg)
-
-            self.read_jmx_output(idx, node)
+
+        self.init_jmx_attributes()
+        self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+        self.start_jmx_tool(idx, node)
+
+        for line in consumer_output:
+            msg = line.strip()
+            if msg == "shutdown_complete":
+                # Note that we can only rely on shutdown_complete message if running 0.10.0
or greater
+                if node in self.clean_shutdown_nodes:
+                    raise Exception("Unexpected shutdown event from consumer, already shutdown.
Consumer index: %d" % idx)
+                self.clean_shutdown_nodes.add(node)
+            else:
+                if self.message_validator is not None:
+                    msg = self.message_validator(msg)
+                if msg is not None:
+                    self.messages_consumed[idx].append(msg)
+
+        self.read_jmx_output(idx, node)
 
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)


Mime
View raw message