kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: Disable JmxTool in kafkatest console-consumer by default (#7785)
Date Fri, 10 Jan 2020 00:55:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 179d0d7  MINOR: Disable JmxTool in kafkatest console-consumer by default (#7785)
179d0d7 is described below

commit 179d0d73d65ab2c3eb8bc79c70b9893f07038447
Author: Brian Bushree <bbushree@confluent.io>
AuthorDate: Thu Jan 9 16:53:36 2020 -0800

    MINOR: Disable JmxTool in kafkatest console-consumer by default (#7785)
    
    Do not initialize `JmxTool` by default when running console consumer. In order to support
this, we remove `has_partitions_assigned` and its only usage in an assertion inside `ProduceConsumeValidateTest`,
which did not seem to contribute much to the validation.
    
    Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 tests/kafkatest/services/console_consumer.py      | 26 -----------------------
 tests/kafkatest/tests/produce_consume_validate.py | 12 -----------
 2 files changed, 38 deletions(-)

diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 3aeed90..5fd4712 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -249,7 +249,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
 
         with self.lock:
-            self._init_jmx_attributes()
             self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
             self.start_jmx_tool(idx, node)
 
@@ -292,28 +291,3 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
 
     def java_class_name(self):
         return "ConsoleConsumer"
-
-    def has_partitions_assigned(self, node):
-        if self.new_consumer is False:
-            return False
-        idx = self.idx(node)
-        with self.lock:
-            self._init_jmx_attributes()
-            self.start_jmx_tool(idx, node)
-            self.read_jmx_output(idx, node)
-        if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
-            return False
-        self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
-        return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
-
-    def _init_jmx_attributes(self):
-        # Must hold lock
-        if self.new_consumer:
-            # We use a flag to track whether we're using this automatically generated ID
because the service could be
-            # restarted multiple times and the client ID may be changed.
-            if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names:
-                self._automatic_metrics = True
-                self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s"
% self.client_id]
-                self.jmx_attributes = ["assigned-partitions"]
-                self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions"
% self.client_id
-
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index d915524..22aa096 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -18,8 +18,6 @@ from ducktape.utils.util import wait_until
 
 from kafkatest.utils import validate_delivery
 
-import time
-
 class ProduceConsumeValidateTest(Test):
     """This class provides a shared template for tests which follow the common pattern of:
 
@@ -56,20 +54,10 @@ class ProduceConsumeValidateTest(Test):
         if (self.consumer_init_timeout_sec > 0):
             self.logger.debug("Waiting %ds for the consumer to initialize.",
                               self.consumer_init_timeout_sec)
-            start = int(time.time())
             wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
                        timeout_sec=self.consumer_init_timeout_sec,
                        err_msg="Consumer process took more than %d s to fork" %\
                        self.consumer_init_timeout_sec)
-            end = int(time.time())
-            remaining_time = self.consumer_init_timeout_sec - (end - start)
-            if remaining_time < 0 :
-                remaining_time = 0
-            if self.consumer.new_consumer:
-                wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0])
is True,
-                           timeout_sec=remaining_time,
-                           err_msg="Consumer process took more than %d s to have partitions
assigned" %\
-                           remaining_time)
 
         self.producer.start()
         wait_until(lambda: self.producer.num_acked > 5,


Mime
View raw message