kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Add version check on enable-systest-events flag
Date Tue, 03 May 2016 04:42:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 548638af1 -> 3cfe73a2c


MINOR: Add version check on enable-systest-events flag

Recent patch adding enable-systest-events flag without any version check breaks all uses of
versioned console consumer. E.g. upgrade tests, compatibility tests etc.

Added a check to only apply the flag if running 0.10.0 or greater.

Author: Geoff Anderson <geoff@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1298 from granders/minor-systest-fix-versioned-console-consumer

(cherry picked from commit 5c47b9f80e2130ab4cc76e2645e0ac0215bf8abe)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cfe73a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cfe73a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cfe73a2

Branch: refs/heads/0.10.0
Commit: 3cfe73a2cadc2321602a6777444282180484fa9a
Parents: 548638a
Author: Geoff Anderson <geoff@confluent.io>
Authored: Mon May 2 21:42:01 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon May 2 21:42:19 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py | 24 ++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfe73a2/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 37638e2..5a33052 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,9 +17,8 @@ from ducktape.utils.util import wait_until
 from ducktape.services.background_thread import BackgroundThreadService
 
 from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9, V_0_10_0_0
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.services.security.security_config import SecurityConfig
 
 import itertools
 import os
@@ -91,7 +90,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
 
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group",
new_consumer=False,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
-                 client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[]):
+                 client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[],
+                 enable_systest_events=False):
         """
         Args:
             context:                    standard context
@@ -106,6 +106,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
                                         waiting for the consumer to stop is a pretty good
way to consume all messages
                                         in a topic.
             print_key                   if True, print each message's key in addition to
its value
+            enable_systest_events       if True, console consumer will print additional lifecycle-related
information
+                                        only available in 0.10.0 and later.
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         BackgroundThreadService.__init__(self, context, num_nodes)
@@ -128,6 +130,11 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         self.print_key = print_key
         self.log_level = "TRACE"
 
+        self.enable_systest_events = enable_systest_events
+        if self.enable_systest_events:
+            # Only available in 0.10.0 and up
+            assert version >= V_0_10_0_0
+
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file appropriate for
the given node."""
         # Process client configuration
@@ -184,9 +191,14 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
 
         # LoggingMessageFormatter was introduced after 0.9
         if node.version > LATEST_0_9:
-            cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
+            cmd += " --formatter kafka.tools.LoggingMessageFormatter"
+
+        if self.enable_systest_events:
+            # enable systest events is only available in 0.10.0 and later
+            # check the assertion here as well, in case node.version has been modified
+            assert node.version >= V_0_10_0_0
+            cmd += " --enable-systest-events"
 
-        cmd += " --enable-systest-events"
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
 
@@ -228,7 +240,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
 
             for line in itertools.chain([first_line], consumer_output):
                 msg = line.strip()
+
                 if msg == "shutdown_complete":
+                    # Note that we can only rely on shutdown_complete message if running
0.10.0 or greater
                     if node in self.clean_shutdown_nodes:
                         raise Exception("Unexpected shutdown event from consumer, already
shutdown. Consumer index: %d" % idx)
                     self.clean_shutdown_nodes.add(node)


Mime
View raw message