kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4558; throttling_test fails if the producer starts too fast
Date Sun, 05 Feb 2017 10:30:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 88809b9b8 -> d0932b028


KAFKA-4558; throttling_test fails if the producer starts too fast

With this change, the consumer will be considered initialized in the
ProduceConsumeValidate tests once its partitions have been assigned.

Author: Apurva Mehta <apurva.1618@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2347 from apurvam/KAFKA-4588-fix-race-between-producer-consumer-start


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0932b02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0932b02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0932b02

Branch: refs/heads/trunk
Commit: d0932b0286d8e69a704211aeaba9fa9503c0fb42
Parents: 88809b9
Author: Apurva Mehta <apurva.1618@gmail.com>
Authored: Sun Feb 5 10:24:43 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sun Feb 5 10:29:44 2017 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/JmxTool.scala   | 35 ++++++++++++++++++--
 tests/kafkatest/services/console_consumer.py    | 34 +++++++++++++++++--
 tests/kafkatest/services/monitor/jmx.py         |  6 ++--
 tests/kafkatest/tests/core/throttling_test.py   |  5 ++-
 .../kafkatest/tests/produce_consume_validate.py | 20 +++++++++--
 5 files changed, 89 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0932b02/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index a1ceb03..e980084 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -30,6 +30,13 @@ import scala.collection.mutable
 import scala.math._
 import kafka.utils.{CommandLineUtils, Exit, Logging}
 
+
+/**
+  * A program for reading JMX metrics from a given endpoint.
+  *
+  * This tool only works reliably if the JmxServer is fully initialized prior to invoking
the tool. See KAFKA-4620 for
+  * details.
+  */
 object JmxTool extends Logging {
 
   def main(args: Array[String]) {
@@ -82,8 +89,32 @@ object JmxTool extends Logging {
     val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(","))
else None
     val dateFormatExists = options.has(dateFormatOpt)
     val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt)))
else None
-    val jmxc = JMXConnectorFactory.connect(url, null)
-    val mbsc = jmxc.getMBeanServerConnection()
+
+    var jmxc: JMXConnector = null
+    var mbsc: MBeanServerConnection = null
+    var retries = 0
+    val maxNumRetries = 10
+    var connected = false
+    while (retries < maxNumRetries && !connected) {
+      try {
+        System.err.println(s"Trying to connect to JMX url: $url.")
+        jmxc = JMXConnectorFactory.connect(url, null)
+        mbsc = jmxc.getMBeanServerConnection
+        connected = true
+      } catch {
+        case e : Exception =>
+          System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.")
+          e.printStackTrace()
+          retries += 1
+          Thread.sleep(500)
+      }
+    }
+
+    if (!connected) {
+      System.err.println(s"Could not connect to JMX url $url after $maxNumRetries retries.")
+      System.err.println("Exiting.")
+      sys.exit(1)
+    }
 
     val queries: Iterable[ObjectName] =
       if(options.has(objectNameOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0932b02/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 94acb65..d55d012 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -73,6 +73,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     LOG_FILE = os.path.join(LOG_DIR, "console_consumer.log")
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "console_consumer.properties")
+    JMX_TOOL_LOG = "/mnt/jmx_tool.log"
+    JMX_TOOL_ERROR_LOG = "/mnt/jmx_tool.err.log"
 
     logs = {
         "consumer_stdout": {
@@ -83,7 +85,13 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             "collect_default": False},
         "consumer_log": {
             "path": LOG_FILE,
-            "collect_default": True}
+            "collect_default": True},
+        "jmx_log": {
+            "path" : JMX_TOOL_LOG,
+            "collect_default": False},
+        "jmx_err_log": {
+            "path": JMX_TOOL_ERROR_LOG,
+            "collect_default": False}
     }
 
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group",
new_consumer=True,
@@ -245,11 +253,12 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         first_line = next(consumer_output, None)
 
         if first_line is not None:
+            self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+            self.init_jmx_attributes()
             self.start_jmx_tool(idx, node)
 
             for line in itertools.chain([first_line], consumer_output):
                 msg = line.strip()
-
                 if msg == "shutdown_complete":
                     # Note that we can only rely on shutdown_complete message if running
0.10.0 or greater
                     if node in self.clean_shutdown_nodes:
@@ -281,3 +290,24 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)
+
+    def has_partitions_assigned(self, node):
+       if self.new_consumer is False:
+          return False
+       idx = self.idx(node)
+       self.init_jmx_attributes()
+       self.start_jmx_tool(idx, node)
+       self.read_jmx_output(idx, node)
+       if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
+           return False
+       self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
+       return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
+
+    def init_jmx_attributes(self):
+        if self.new_consumer is True:
+            if self.jmx_object_names is None:
+                self.jmx_object_names = []
+                self.jmx_object_names += ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s"
% self.client_id]
+                self.jmx_attributes += ["assigned-partitions"]
+                self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions"
% self.client_id
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0932b02/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index e71040b..e64d03a 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -35,6 +35,7 @@ class JmxMixin(object):
         self.average_jmx_value = {}  # map from object_attribute_name to average value observed
over time
 
         self.jmx_tool_log = "/mnt/jmx_tool.log"
+        self.jmx_tool_err_log = "/mnt/jmx_tool.err.log"
 
     def clean_node(self, node):
         node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
@@ -55,11 +56,12 @@ class JmxMixin(object):
             cmd += " --object-name %s" % jmx_object_name
         for jmx_attribute in self.jmx_attributes:
             cmd += " --attributes %s" % jmx_attribute
-        cmd += " >> %s &" % self.jmx_tool_log
+        cmd += " 1>> %s" % self.jmx_tool_log
+        cmd += " 2>> %s &" % self.jmx_tool_err_log
 
         self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
         node.account.ssh(cmd, allow_fail=False)
-        wait_until(lambda: self._jmx_has_output(node), timeout_sec=5, backoff_sec=.5, err_msg="%s:
Jmx tool took too long to start" % node.account)
+        wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s:
Jmx tool took too long to start" % node.account)
         self.started[idx-1] = True
 
     def _jmx_has_output(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0932b02/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 80b5658..17a60ea 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -15,7 +15,7 @@
 
 import time
 import math
-from ducktape.mark import parametrize,ignore
+from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
@@ -138,10 +138,9 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                 estimated_throttled_time,
                 time_taken))
 
-    @ignore
     @cluster(num_nodes=10)
-    @parametrize(bounce_brokers=False)
     @parametrize(bounce_brokers=True)
+    @parametrize(bounce_brokers=False)
     def test_throttled_reassignment(self, bounce_brokers):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0932b02/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 801ccde..cad9150 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -15,7 +15,7 @@
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-
+import time
 
 class ProduceConsumeValidateTest(Test):
     """This class provides a shared template for tests which follow the common pattern of:
@@ -50,12 +50,28 @@ class ProduceConsumeValidateTest(Test):
         # Start background producer and consumer
         self.consumer.start()
         if (self.consumer_init_timeout_sec > 0):
-            self.logger.debug("Waiting %ds for the consumer to fork.",
+            self.logger.debug("Waiting %ds for the consumer to initialize.",
                               self.consumer_init_timeout_sec)
+            start = int(time.time())
             wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
                        timeout_sec=self.consumer_init_timeout_sec,
                        err_msg="Consumer process took more than %d s to fork" %\
                        self.consumer_init_timeout_sec)
+            end = int(time.time())
+            # If `JMXConnectFactory.connect` is invoked during the
+            # initialization of the JMX server, it may fail to throw the
+            # specified IOException back to the calling code. The sleep is a
+            # workaround that should allow initialization to complete before we
+            # try to connect. See KAFKA-4620 for more details.
+            time.sleep(1)
+            remaining_time = self.consumer_init_timeout_sec - (end - start)
+            if remaining_time < 0 :
+                remaining_time = 0
+            if self.consumer.new_consumer is True:
+                wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0])
is True,
+                           timeout_sec=remaining_time,
+                           err_msg="Consumer process took more than %d s to have partitions
assigned" %\
+                           remaining_time)
 
         self.producer.start()
         wait_until(lambda: self.producer.num_acked > 5,


Mime
View raw message