kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: Revert "MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX"
Date Thu, 20 Jul 2017 04:42:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 7f96aa36f -> 9b7af79a1


Revert "MINOR: Do not wait for first line of console consumer output since we now have a more
reliable test using JMX"

This reverts commit 7f96aa36fbb1474e9bc5c8189640136cdc97df8f.

See https://issues.apache.org/jira/browse/KAFKA-5608 and https://github.com/apache/kafka/pull/3547
for more details.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b7af79a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b7af79a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b7af79a

Branch: refs/heads/0.10.1
Commit: 9b7af79a1441aef612e2e0eadbe057afcb5ce3c8
Parents: 7f96aa3
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Wed Jul 19 21:41:59 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Jul 19 21:41:59 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py | 37 ++++++++++++-----------
 1 file changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b7af79a/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 1592379..dae6095 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -237,23 +237,26 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-
-        self.start_jmx_tool(idx, node)
-
-        for line in consumer_output:
-            msg = line.strip()
-            if msg == "shutdown_complete":
-                # Note that we can only rely on shutdown_complete message if running 0.10.0
or greater
-                if node in self.clean_shutdown_nodes:
-                    raise Exception("Unexpected shutdown event from consumer, already shutdown.
Consumer index: %d" % idx)
-                self.clean_shutdown_nodes.add(node)
-            else:
-                if self.message_validator is not None:
-                    msg = self.message_validator(msg)
-                if msg is not None:
-                    self.messages_consumed[idx].append(msg)
-
-        self.read_jmx_output(idx, node)
+        first_line = next(consumer_output, None)
+
+        if first_line is not None:
+            self.start_jmx_tool(idx, node)
+
+            for line in itertools.chain([first_line], consumer_output):
+                msg = line.strip()
+
+                if msg == "shutdown_complete":
+                    # Note that we can only rely on shutdown_complete message if running
0.10.0 or greater
+                    if node in self.clean_shutdown_nodes:
+                        raise Exception("Unexpected shutdown event from consumer, already
shutdown. Consumer index: %d" % idx)
+                    self.clean_shutdown_nodes.add(node)
+                else:
+                    if self.message_validator is not None:
+                        msg = self.message_validator(msg)
+                    if msg is not None:
+                        self.messages_consumed[idx].append(msg)
+
+            self.read_jmx_output(idx, node)
 
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)


Mime
View raw message