kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
Date Sun, 01 Mar 2020 00:44:21 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72a5aa8  MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
72a5aa8 is described below

commit 72a5aa8b0758f23b42c6e4453d6dc6c4861aa1ad
Author: Brian Bushree <bbushree@confluent.io>
AuthorDate: Sat Feb 29 16:43:51 2020 -0800

    MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
    
    what/why
    the throttling_test was broken by this PR (#7785) since it depends on the consumer having
partitions-assigned before starting the producer
    
    this PR provides the ability to wait for partitions to be assigned in the console consumer
before considering it started.
    
    caveat
    this does not support starting up the JmxTool inside the console-consumer for custom metrics
while using this wait_until_partitions_assigned flag since the code assumes one JmxTool running
per node.
    
    I think a proper fix for this would be to make JmxTool its own standalone single-node
service
    
    alternatives
    we could use the EndToEnd test suite which uses the verifiable producer/consumer under
the hood but I found that there were more changes necessary to get this working unfortunately
(specifically doesn't seem like this test suite plays nicely with the ProducerPerformanceService)
    
    Reviewers: Mathew Wong <mwong@confluent.io>, Bill Bejeck <bbejeck.com>
---
 tests/kafkatest/services/console_consumer.py       | 28 ++++++++++++++++++++--
 tests/kafkatest/services/monitor/jmx.py            | 19 +++++++++++++--
 .../tests/core/fetch_from_follower_test.py         | 16 +------------
 tests/kafkatest/tests/core/throttling_test.py      |  3 ++-
 4 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 0811bcd..d349ed7 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,9 +17,10 @@ import itertools
 import os
 
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
 from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0,
V_0_10_0_0, V_0_11_0_0, V_2_0_0
 
 """
@@ -62,7 +63,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
                  enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False,
print_partition=False,
                  isolation_level="read_uncommitted", jaas_override_variables=None,
-                 kafka_opts_override="", client_prop_file_override="", consumer_properties={}):
+                 kafka_opts_override="", client_prop_file_override="", consumer_properties={},
+                 wait_until_partitions_assigned=False):
         """
         Args:
             context:                    standard context
@@ -124,6 +126,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.kafka_opts_override = kafka_opts_override
         self.client_prop_file_override = client_prop_file_override
         self.consumer_properties = consumer_properties
+        self.wait_until_partitions_assigned = wait_until_partitions_assigned
 
 
     def prop_file(self, node):
@@ -273,8 +276,29 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         with self.lock:
             self.read_jmx_output(idx, node)
 
+    def _wait_until_partitions_assigned(self, node, timeout_sec=60):
+        if self.jmx_object_names is not None:
+            raise Exception("'wait_until_partitions_assigned' is not supported while using
'jmx_object_names'/'jmx_attributes'")
+        jmx_tool = JmxTool(self.context, jmx_poll_ms=100)
+        jmx_tool.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s"
% self.client_id]
+        jmx_tool.jmx_attributes = ["assigned-partitions"]
+        jmx_tool.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions"
% self.client_id
+        jmx_tool.start_jmx_tool(self.idx(node), node)
+        assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions"
% self.client_id
+
+        def read_and_check():
+            jmx_tool.read_jmx_output(self.idx(node), node)
+            return assigned_partitions_jmx_attr in jmx_tool.maximum_jmx_value
+
+        wait_until(lambda: read_and_check(),
+                timeout_sec=timeout_sec,
+                backoff_sec=.5,
+                err_msg="consumer was not assigned partitions within %d seconds" % timeout_sec)
+
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)
+        if self.wait_until_partitions_assigned:
+            self._wait_until_partitions_assigned(node)
 
     def stop_node(self, node):
         self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account)))
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index c5b747d..2dcd369 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -17,6 +17,8 @@ import os
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
 
 class JmxMixin(object):
@@ -41,10 +43,11 @@ class JmxMixin(object):
         self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
         self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")
 
-    def clean_node(self, node):
+    def clean_node(self, node, idx=None):
         node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False,
                                          allow_fail=True)
-        idx = self.idx(node)
+        if idx is None:
+            idx = self.idx(node)
         self.started[idx-1] = False
         node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False)
 
@@ -139,3 +142,15 @@ class JmxMixin(object):
 
     def jmx_class_name(self):
         return "kafka.tools.JmxTool"
+
+class JmxTool(JmxMixin, KafkaPathResolverMixin):
+    """
+    Simple helper class for using the JmxTool directly instead of as a mix-in
+    """
+    def __init__(self, text_context, *args, **kwargs):
+        JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
+        self.context = text_context
+
+    @property
+    def logger(self):
+        return self.context.logger
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index fde1baf..ef37728 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -18,29 +18,15 @@ from collections import defaultdict
 
 from ducktape.mark.resource import cluster
 
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxTool
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 
 
-class JmxTool(JmxMixin, KafkaPathResolverMixin):
-    """
-    Simple helper class for using the JmxTool directly instead of as a mix-in
-    """
-    def __init__(self, text_context, *args, **kwargs):
-        JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
-        self.context = text_context
-
-    @property
-    def logger(self):
-        return self.context.logger
-
-
 class FetchFromFollowerTest(ProduceConsumeValidateTest):
 
     RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector"
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 4a8327e..f29ec2b 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -165,7 +165,8 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                                         self.topic,
                                         consumer_timeout_ms=60000,
                                         message_validator=is_int,
-                                        from_beginning=False)
+                                        from_beginning=False,
+                                        wait_until_partitions_assigned=True)
 
         self.kafka.start()
         bulk_producer.run()


Mime
View raw message