kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Pluggable verifiable clients
Date Wed, 22 Mar 2017 05:23:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 83824089f -> d5cec01f2


MINOR: Pluggable verifiable clients

This adds support for pluggable VerifiableConsumer and VerifiableProducers test client implementations
allowing third-party clients to be used in-place of the Java client in kafkatests.

A new VerifiableClientMixin class is added and the standard Java Verifiable{Producer,Consumer}
classes have been changed to use it.

While third-party client drivers may be implemented with a complete class based on the Mixin,
a simpler
alternative which requries no kafkatest class implementation is available through the VerifiableClientApp
class that uses ducktape's global param to specify the client app to use (passed to ducktape
through the `--globals <json>` command line argument).

Some existing kafkatest clients for reference:
Go: https://github.com/confluentinc/confluent-kafka-go/tree/master/kafkatest
Python: https://github.com/confluentinc/confluent-kafka-python/tree/master/confluent_kafka/kafkatest
C++: https://github.com/edenhill/librdkafka/blob/0.9.2.x/examples/kafkatest_verifiable_client.cpp
C#/.NET: https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/test/Confluent.Kafka.VerifiableClient

This PR also contains documentation for the simplex JSON-based verifiable\* client protocol.

There are also some minor improvements that makes troubleshooting failing client tests easier.

Author: Magnus Edenhill <magnus@edenhill.se>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2048 from edenhill/pluggable_verifiable_clients


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d5cec01f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d5cec01f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d5cec01f

Branch: refs/heads/trunk
Commit: d5cec01f2aeb37823d1a158683da0809a0c54818
Parents: 8382408
Author: Magnus Edenhill <magnus@edenhill.se>
Authored: Tue Mar 21 22:24:17 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Mar 21 22:24:17 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/verifiable_client.py   | 330 +++++++++++++++++++
 tests/kafkatest/services/verifiable_consumer.py |  65 ++--
 tests/kafkatest/services/verifiable_producer.py | 119 +++----
 .../client/consumer_rolling_upgrade_test.py     |   6 +-
 tests/kafkatest/tests/client/consumer_test.py   |   9 +-
 tests/kafkatest/tests/client/pluggable_test.py  |  51 +++
 .../apache/kafka/tools/VerifiableConsumer.java  |   9 +
 .../apache/kafka/tools/VerifiableProducer.java  |   7 +
 8 files changed, 487 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tests/kafkatest/services/verifiable_client.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py
new file mode 100644
index 0000000..d8ffa35
--- /dev/null
+++ b/tests/kafkatest/services/verifiable_client.py
@@ -0,0 +1,330 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
+import importlib
+import os
+import subprocess
+import signal
+
+
+"""This module abstracts the implementation of a verifiable client, allowing
+client developers to plug in their own client for all kafkatests that make
+use of either the VerifiableConsumer or VerifiableProducer classes.
+
+A verifiable client class must implement exec_cmd() and pids().
+
+This file provides:
+ * VerifiableClientMixin class: to be used for creating new verifiable client classes
+ * VerifiableClientJava class: the default Java verifiable clients
+ * VerifiableClientApp class: uses global configuration to specify
+   the command to execute and optional "pids" command, deploy script, etc.
+   Config syntax (pass as --global <json_or_jsonfile>):
+      {"Verifiable(Producer|Consumer|Client)": {
+       "class": "kafkatest.services.verifiable_client.VerifiableClientApp",
+       "exec_cmd": "/vagrant/x/myclient --some --standard --args",
+       "pids": "pgrep -f ...", // optional
+       "deploy": "/vagrant/x/mydeploy.sh", // optional
+       "kill_signal": 2 // optional clean_shutdown kill signal (SIGINT in this case)
+      }}
+ * VerifiableClientDummy class: testing dummy
+
+
+
+==============================
+Verifiable client requirements
+==============================
+
+There are currently two verifiable client specifications:
+ * VerifiableConsumer
+ * VerifiableProducer
+
+Common requirements for both:
+ * One-way communication (client -> tests) through new-line delimited
+   JSON objects on stdout (details below).
+ * Log/debug to stderr
+
+Common communication for both:
+ * `{ "name": "startup_complete" }` - Client succesfully started
+ * `{ "name": "shutdown_complete" }` - Client succesfully terminated (after receiving SIGINT/SIGTERM)
+
+
+==================
+VerifiableConsumer
+==================
+
+Command line arguments:
+ * `--group-id <group-id>`
+ * `--topic <topic>`
+ * `--broker-list <brokers>`
+ * `--session-timeout <n>`
+ * `--enable-autocommit`
+ * `--max-messages <n>`
+ * `--assignment-strategy <s>`
+ * `--consumer.config <config-file>` - consumer config properties (typically empty)
+
+Environment variables:
+ * `LOG_DIR` - log output directory. Typically not needed if logs are written to stderr.
+ * `KAFKA_OPTS` - Security config properties (Java client syntax)
+ * `KAFKA_LOG4J_OPTS` - Java log4j options (can be ignored)
+
+Client communication:
+ * `{ "name": "offsets_committed",  "success": bool, "error": "<errstr>", "offsets":
[ { "topic": "<t>", "partition": <p>, "offset": <o> } ] }` - offset commit
results, should be emitted for each committed offset. Emit prior to partitions_revoked.
+ * `{ "name": "records_consumed", "partitions": [ { "topic": "<t>", "partition": <p>,
 "minOffset": <o>, "maxOffset": <o> } ], "count": <total_consumed> }` -
per-partition delta stats from last records_consumed. Emit every 1000 messages, or 1s. Emit
prior to partitions_assigned, partitions_revoked and offsets_committed.
+ * `{ "name": "partitions_revoked", "partitions": [ { "topic": "<t>", "partition":
<p> } ] }` - rebalance: revoked partitions
+ * `{ "name": "partitions_assigned", "partitions": [ { "topic": "<t>", "partition":
<p> } ] }` - rebalance: assigned partitions
+
+
+==================
+VerifiableProducer
+==================
+
+Command line arguments:
+ * `--topic <topic>`
+ * `--broker-list <brokers>`
+ * `--max-messages <n>`
+ * `--throughput <msgs/s>`
+ * `--producer.config <config-file>` - producer config properties (typically empty)
+
+Environment variables:
+ * `LOG_DIR` - log output directory. Typically not needed if logs are written to stderr.
+ * `KAFKA_OPTS` - Security config properties (Java client syntax)
+ * `KAFKA_LOG4J_OPTS` - Java log4j options (can be ignored)
+
+Client communication:
+ * `{ "name": "producer_send_error", "message": "<error msg>", "topic": "<t>",
"key": "<msg key>", "value": "<msg value>" }` - emit on produce error.
+ * `{ "name": "producer_send_success", "topic": "<t>", "partition": <p>, "offset":
<o>, "key": "<msg key>", "value": "<msg value>" }` - emit on produce success.
+
+
+
+===========
+Development
+===========
+
+**Logs:**
+During development of kafkatest clients it is generally a good idea to
+enable collection of the client's stdout and stderr logs for troubleshooting.
+Do this by setting "collect_default" to True for verifiable_consumder_stdout
+and .._stderr in verifiable_consumer.py and verifiable_producer.py
+
+
+**Deployment:**
+There's currently no automatic way of deploying 3rd party kafkatest clients
+on the VM instance so this needs to be done (at least partially) manually for
+now.
+
+One way to do this is logging in to a worker (`vagrant ssh worker1`), downloading
+and building the kafkatest client under /vagrant (which maps to the kafka root
+directory on the host and is shared with all VM instances).
+Also make sure to install any system-level dependencies on each instance.
+
+Then use /vagrant/..../yourkafkatestclient as your run-time path since it will
+now be available on all instances.
+
+The VerifiableClientApp automates the per-worker deployment with the optional
+"deploy": "/vagrant/../deploy_script.sh" globals configuration property, this
+script will be called on the VM just prior to executing the client.
+"""
+
+def create_verifiable_client_implementation(context, parent):
+    """Factory for generating a verifiable client implementation class instance
+
+    :param parent: parent class instance, either VerifiableConsumer or VerifiableProducer
+
+    This will first check for a fully qualified client implementation class name
+    in context.globals as "Verifiable<type>" where <type> is "Producer" or "Consumer",
+    followed by "VerifiableClient" (which should implement both).
+    The global object layout is: {"class": "<full class name>", "..anything..": ..}.
+
+    If present, construct a new instance, else defaults to VerifiableClientJava
+    """
+
+    # Default class
+    obj = {"class": "kafkatest.services.verifiable_client.VerifiableClientJava"}
+
+    parent_name = parent.__class__.__name__.rsplit('.', 1)[-1]
+    for k in [parent_name, "VerifiableClient"]:
+        if k in context.globals:
+            obj = context.globals[k]
+            break
+
+    if "class" not in obj:
+        raise SyntaxError('%s (or VerifiableClient) expected object format: {"class": "full.class.path",
..}' % parent_name)
+
+    clname = obj["class"]
+    # Using the fully qualified classname, import the implementation class
+    if clname.find('.') == -1:
+        raise SyntaxError("%s (or VerifiableClient) must specify full class path (including
module)" % parent_name)
+
+    (module_name, clname) = clname.rsplit('.', 1)
+    cluster_mod = importlib.import_module(module_name)
+    impl_class = getattr(cluster_mod, clname)
+    return impl_class(parent, obj)
+
+
+
+class VerifiableClientMixin (object):
+    """
+    Verifiable client mixin class
+    """
+    @property
+    def impl (self):
+        """
+        :return: Return (and create if necessary) the Verifiable client implementation object.
+        """
+        # Add _impl attribute to parent Verifiable(Consumer|Producer) object.
+        if not hasattr(self, "_impl"):
+            setattr(self, "_impl", create_verifiable_client_implementation(self.context,
self))
+            if hasattr(self.context, "logger") and self.context.logger is not None:
+                self.context.logger.debug("Using client implementation %s for %s" % (self._impl.__class__.__name__,
self.__class__.__name__))
+        return self._impl
+
+
+    def exec_cmd (self, node):
+        """
+        :return: command string to execute client.
+        Environment variables will be prepended and command line arguments
+        appended to this string later by start_cmd().
+
+        This method should also take care of deploying the client on the instance, if necessary.
+        """
+        raise NotImplementedError()
+
+    def pids (self, node):
+        """ :return: list of pids for this client instance on node """
+        raise NotImplementedError()
+
+    def kill_signal (self, clean_shutdown=True):
+        """ :return: the kill signal to terminate the application. """
+        if not clean_shutdown:
+            return signal.SIGKILL
+
+        return self.conf.get("kill_signal", signal.SIGTERM)
+
+
+class VerifiableClientJava (VerifiableClientMixin):
+    """
+    Verifiable Consumer and Producer using the official Java client.
+    """
+    def __init__(self, parent, conf=None):
+        """
+        :param parent: The parent instance, either VerifiableConsumer or VerifiableProducer
+        :param conf: Optional conf object (the --globals VerifiableX object)
+        """
+        super(VerifiableClientJava, self).__init__()
+        self.parent = parent
+        self.java_class_name = self.parent.__class__.__name__
+        self.conf = conf
+
+    def exec_cmd (self, node):
+        """ :return: command to execute to start instance
+        Translates Verifiable* to the corresponding Java client class name """
+        cmd = ""
+        if self.java_class_name == 'VerifiableProducer' and node.version <= LATEST_0_8_2:
+            # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
+            # the tools jar from trunk to the classpath
+            tools_jar = self.parent.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
+            tools_dependant_libs_jar = self.parent.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME,
DEV_BRANCH)
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
+            cmd += "export CLASSPATH; "
+        cmd += self.parent.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools."
+ self.java_class_name
+        return cmd
+
+    def pids (self, node):
+        """ :return: pid(s) for this client intstance on node """
+        try:
+            cmd = "jps | grep -i " + self.java_class_name + " | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (RemoteCommandError, ValueError) as e:
+            return []
+
+
+class VerifiableClientDummy (VerifiableClientMixin):
+    """
+    Dummy class for testing the pluggable framework
+    """
+    def __init__(self, parent, conf=None):
+        """
+        :param parent: The parent instance, either VerifiableConsumer or VerifiableProducer
+        :param conf: Optional conf object (the --globals VerifiableX object)
+        """
+        super(VerifiableClientDummy, self).__init__()
+        self.parent = parent
+        self.conf = conf
+
+    def exec_cmd (self, node):
+        """ :return: command to execute to start instance """
+        return 'echo -e \'{"name": "shutdown_complete" }\n\' ; echo ARGS:'
+
+    def pids (self, node):
+        """ :return: pid(s) for this client intstance on node """
+        return []
+
+
+class VerifiableClientApp (VerifiableClientMixin):
+    """
+    VerifiableClient using --global settings for exec_cmd, pids and deploy.
+    By using this a verifiable client application can be used through simple
+    --globals configuration rather than implementing a Python class.
+    """
+
+    def __init__(self, parent, conf):
+        """
+        :param parent: The parent instance, either VerifiableConsumer or VerifiableProducer
+        :param conf: Optional conf object (the --globals VerifiableX object)
+        """
+        super(VerifiableClientApp, self).__init__()
+        self.parent = parent
+        # "VerifiableConsumer" or "VerifiableProducer"
+        self.name = self.parent.__class__.__name__
+        self.conf = conf
+
+        if "exec_cmd" not in self.conf:
+            raise SyntaxError("%s requires \"exec_cmd\": .. to be set in --globals %s object"
% \
+                              (self.__class__.__name__, self.name))
+
+    def exec_cmd (self, node):
+        """ :return: command to execute to start instance """
+        self.deploy(node)
+        return self.conf["exec_cmd"]
+
+    def pids (self, node):
+        """ :return: pid(s) for this client intstance on node """
+
+        cmd = self.conf.get("pids", "pgrep -f '" + self.conf["exec_cmd"] + "'")
+        try:
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            self.parent.context.logger.info("%s pids are: %s" % (str(node.account), pid_arr))
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def deploy (self, node):
+        """ Call deploy script specified by "deploy" --global key
+            This optional script is run on the VM instance just prior to
+            executing `exec_cmd` to deploy the kafkatest client.
+            The script path must be as seen by the VM instance, e.g. /vagrant/.... """
+
+        if "deploy" not in self.conf:
+            return
+
+        script_cmd = self.conf["deploy"]
+        self.parent.context.logger.debug("Deploying %s: %s" % (self, script_cmd))
+        r = node.account.ssh(script_cmd)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 0139132..735a7bf 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -18,17 +18,18 @@ import os
 import signal
 
 from ducktape.services.background_thread import BackgroundThreadService
-from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.version import DEV_BRANCH
 
 
 class ConsumerState:
-    Dead = 1
+    Started = 1
+    Dead = 2
     Rebalancing = 3
-    Joined = 2
+    Joined = 4
 
 
 class ConsumerEventHandler(object):
@@ -48,9 +49,16 @@ class ConsumerEventHandler(object):
         self.assignment = []
         self.position = {}
 
-    def handle_offsets_committed(self, event):
+    def handle_startup_complete(self):
+        self.state = ConsumerState.Started
+
+    def handle_offsets_committed(self, event, node, logger):
         if event["success"]:
             for offset_commit in event["offsets"]:
+                if offset_commit.get("error", "") != "":
+                    logger.debug("%s: Offset commit failed for: %s" % (str(node.account),
offset_commit))
+                    continue
+
                 topic = offset_commit["topic"]
                 partition = offset_commit["partition"]
                 tp = TopicPartition(topic, partition)
@@ -58,6 +66,7 @@ class ConsumerEventHandler(object):
                 assert tp in self.assignment, \
                     "Committed offsets for partition %s not assigned (current assignment:
%s)" % \
                     (str(tp), str(self.assignment))
+                assert tp in self.position, "No previous position for %s: %s" % (str(tp),
event)
                 assert self.position[tp] >= offset, \
                     "The committed offset %d was greater than the current position %d for
partition %s" % \
                     (offset, self.position[t], str(tp))
@@ -120,7 +129,8 @@ class ConsumerEventHandler(object):
             return None
 
 
-class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
+
+class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout")
     STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stderr")
@@ -189,24 +199,28 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
         self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
 
         for line in node.account.ssh_capture(cmd):
-            event = self.try_parse_json(line.strip())
+            event = self.try_parse_json(node, line.strip())
             if event is not None:
                 with self.lock:
                     name = event["name"]
                     if name == "shutdown_complete":
                         handler.handle_shutdown_complete()
-                    if name == "offsets_committed":
-                        handler.handle_offsets_committed(event)
+                    elif name == "startup_complete":
+                        handler.handle_startup_complete()
+                    elif name == "offsets_committed":
+                        handler.handle_offsets_committed(event, node, self.logger)
                         self._update_global_committed(event)
                     elif name == "records_consumed":
                         handler.handle_records_consumed(event)
-                        self._update_global_position(event)
+                        self._update_global_position(event, node)
                     elif name == "partitions_revoked":
                         handler.handle_partitions_revoked(event)
                     elif name == "partitions_assigned":
                         handler.handle_partitions_assigned(event)
+                    else:
+                        self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account),
event))
 
-    def _update_global_position(self, consumed_event):
+    def _update_global_position(self, consumed_event, node):
         for consumed_partition in consumed_event["partitions"]:
             tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])
             if tp in self.global_committed:
@@ -218,8 +232,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
             # the consumer cannot generally guarantee that the position increases monotonically
             # without gaps in the face of hard failures, so we only log a warning when this
happens
             if tp in self.global_position and self.global_position[tp] != consumed_partition["minOffset"]:
-                self.logger.warn("Expected next consumed offset of %d for partition %s, but
instead saw %d" %
-                                 (self.global_position[tp], str(tp), consumed_partition["minOffset"]))
+                self.logger.warn("%s: Expected next consumed offset of %d for partition %s,
but instead saw %d" %
+                                 (str(node.account), self.global_position[tp], str(tp), consumed_partition["minOffset"]))
 
             self.global_position[tp] = consumed_partition["maxOffset"] + 1
 
@@ -238,9 +252,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
         cmd += "export LOG_DIR=%s;" % VerifiableConsumer.LOG_DIR
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
-        cmd += self.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools.VerifiableConsumer"
\
-              " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy
%s %s" % \
-                                            (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
+        cmd += self.impl.exec_cmd(node)
+        cmd += " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy
%s %s" % \
+               (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
                self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit"
if self.enable_autocommit else "")
                
         if self.max_messages > 0:
@@ -251,19 +265,14 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
         return cmd
 
     def pids(self, node):
-        try:
-            cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'"
-            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
-            return pid_arr
-        except (RemoteCommandError, ValueError) as e:
-            return []
+        return self.impl.pids(node)
 
-    def try_parse_json(self, string):
+    def try_parse_json(self, node, string):
         """Try to parse a string as json. Return None if not parseable."""
         try:
             return json.loads(string)
         except ValueError:
-            self.logger.debug("Could not parse as json: %s" % str(string))
+            self.logger.debug("%s: Could not parse as json: %s" % (str(node.account), str(string)))
             return None
 
     def stop_all(self):
@@ -271,10 +280,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
             self.stop_node(node)
 
     def kill_node(self, node, clean_shutdown=True, allow_fail=False):
-        if clean_shutdown:
-            sig = signal.SIGTERM
-        else:
-            sig = signal.SIGKILL
+        sig = self.impl.kill_signal(clean_shutdown)
         for pid in self.pids(node):
             node.account.signal(pid, sig, allow_fail)
 
@@ -340,3 +346,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
         with self.lock:
             return [handler.node for handler in self.event_handlers.itervalues()
                     if handler.state == ConsumerState.Dead]
+
+    def alive_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.itervalues()
+                    if handler.state != ConsumerState.Dead]

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 5f39df2..e32a3de 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -22,13 +22,15 @@ from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
 
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME,
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2
+from kafkatest.version import DEV_BRANCH
 from kafkatest.utils.remote_account import line_count
 
 
-class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
+
+class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_producer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
     STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
@@ -123,55 +125,38 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         self.produced_count[idx] = 0
         last_produced_time = time.time()
         prev_msg = None
-        node.account.ssh(cmd)
-
-        # Ensure that STDOUT_CAPTURE exists before try to read from it
-        # Note that if max_messages is configured, it's possible for the process to exit
before this
-        # wait_until condition is checked
-        start = time.time()
-        wait_until(lambda: node.account.isfile(VerifiableProducer.STDOUT_CAPTURE) and
-                           line_count(node, VerifiableProducer.STDOUT_CAPTURE) > 0,
-                   timeout_sec=10, err_msg="%s: VerifiableProducer took too long to start"
% node.account)
-        self.logger.debug("%s: VerifiableProducer took %s seconds to start" % (node.account,
time.time() - start))
-
-        with node.account.open(VerifiableProducer.STDOUT_CAPTURE, 'r') as f:
-            while True:
-                line = f.readline()
-                if line == '' and not self.alive(node):
-                    # The process is gone, and we've reached the end of the output file,
so we don't expect
-                    # any more output to appear in the STDOUT_CAPTURE file
-                    break
-
-                line = line.strip()
-
-                data = self.try_parse_json(line)
-                if data is not None:
-
-                    with self.lock:
-                        if data["name"] == "producer_send_error":
-                            data["node"] = idx
-                            self.not_acked_values.append(self.message_validator(data["value"]))
-                            self.produced_count[idx] += 1
-
-                        elif data["name"] == "producer_send_success":
-                            self.acked_values.append(self.message_validator(data["value"]))
-                            self.produced_count[idx] += 1
-
-                            # Log information if there is a large gap between successively
acknowledged messages
-                            t = time.time()
-                            time_delta_sec = t - last_produced_time
-                            if time_delta_sec > 2 and prev_msg is not None:
-                                self.logger.debug(
-                                    "Time delta between successively acked messages is large:
" +
-                                    "delta_t_sec: %s, prev_message: %s, current_message:
%s" % (str(time_delta_sec), str(prev_msg), str(data)))
-
-                            last_produced_time = t
-                            prev_msg = data
-
-                        elif data["name"] == "shutdown_complete":
-                            if node in self.clean_shutdown_nodes:
-                                raise Exception("Unexpected shutdown event from producer,
already shutdown. Producer index: %d" % idx)
-                            self.clean_shutdown_nodes.add(node)
+
+        for line in node.account.ssh_capture(cmd):
+            line = line.strip()
+
+            data = self.try_parse_json(line)
+            if data is not None:
+
+                with self.lock:
+                    if data["name"] == "producer_send_error":
+                        data["node"] = idx
+                        self.not_acked_values.append(self.message_validator(data["value"]))
+                        self.produced_count[idx] += 1
+
+                    elif data["name"] == "producer_send_success":
+                        self.acked_values.append(self.message_validator(data["value"]))
+                        self.produced_count[idx] += 1
+
+                        # Log information if there is a large gap between successively acknowledged
messages
+                        t = time.time()
+                        time_delta_sec = t - last_produced_time
+                        if time_delta_sec > 2 and prev_msg is not None:
+                            self.logger.debug(
+                                "Time delta between successively acked messages is large:
" +
+                                "delta_t_sec: %s, prev_message: %s, current_message: %s"
% (str(time_delta_sec), str(prev_msg), str(data)))
+
+                        last_produced_time = t
+                        prev_msg = data
+
+                    elif data["name"] == "shutdown_complete":
+                        if node in self.clean_shutdown_nodes:
+                            raise Exception("Unexpected shutdown event from producer, already
shutdown. Producer index: %d" % idx)
+                        self.clean_shutdown_nodes.add(node)
 
     def _has_output(self, node):
         """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log
contains output."""
@@ -182,22 +167,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
             return True
 
     def start_cmd(self, node, idx):
-        cmd = ""
-        if node.version <= LATEST_0_8_2:
-            # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
-            # the tools jar from the development branch to the classpath
-            tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
-            tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME,
DEV_BRANCH)
-
-            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
-            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
-            cmd += "export CLASSPATH; "
-
-        cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
+        cmd  = "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
-        cmd += " " + self.path.script("kafka-run-class.sh", node)
-        cmd += " org.apache.kafka.tools.VerifiableProducer"
+        cmd += self.impl.exec_cmd(node)
         cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol))
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
@@ -209,24 +182,16 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
             cmd += " --acks %s " % str(self.acks)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
-        cmd += " 2>> %s 1>> %s &" % (VerifiableProducer.STDERR_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
+        cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
     def kill_node(self, node, clean_shutdown=True, allow_fail=False):
-        if clean_shutdown:
-            sig = signal.SIGTERM
-        else:
-            sig = signal.SIGKILL
+        sig = self.impl.kill_signal(clean_shutdown)
         for pid in self.pids(node):
             node.account.signal(pid, sig, allow_fail)
 
     def pids(self, node):
-        try:
-            cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'"
-            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
-            return pid_arr
-        except (RemoteCommandError, ValueError) as e:
-            return []
+        return self.impl.pids(node)
 
     def alive(self, node):
         return len(self.pids(node)) > 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index e5904b1..638a3fc 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -36,13 +36,15 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
         assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
         assert assignment == set([
             frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
-            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])]),
\
+            "Mismatched assignment: %s" % assignment
 
     def _verify_roundrobin_assignment(self, consumer):
         assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
         assert assignment == set([
             frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
-            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])]),
\
+            "Mismatched assignment: %s" % assignment
 
     @cluster(num_nodes=4)
     def rolling_update_test(self):

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index f66021d..999b85c 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -180,7 +180,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
         # stop the partition owner and await its shutdown
         consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
         wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition)
!= None,
-                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for
consumer to close")
+                   timeout_sec=self.session_timeout_sec*2+5,
+                   err_msg="Timed out waiting for consumer to close")
 
         # ensure that the remaining consumer does some work after rebalancing
         self.await_consumed_messages(consumer, min_messages=1000)
@@ -309,5 +310,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
         for num_started, node in enumerate(consumer.nodes, 1):
             consumer.start_node(node)
             self.await_members(consumer, num_started)
-            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
-
+            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()),
\
+                "expected valid assignments of %d partitions when num_started %d: %s" % \
+                (self.NUM_PARTITIONS, num_started, \
+                 [(str(node.account), a) for node, a in consumer.current_assignment().items()])

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tests/kafkatest/tests/client/pluggable_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/pluggable_test.py b/tests/kafkatest/tests/client/pluggable_test.py
new file mode 100644
index 0000000..a2599d8
--- /dev/null
+++ b/tests/kafkatest/tests/client/pluggable_test.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+
+class PluggableConsumerTest(VerifiableConsumerTest):
+    """ Verify that the pluggable client framework works. """
+
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 1
+
+    def __init__(self, test_context):
+        super(PluggableConsumerTest, self).__init__(test_context, num_consumers=1, num_producers=0,
+                                num_zk=1, num_brokers=1, topics={
+                                self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor':
1 },
+        })
+
+    def test_start_stop(self):
+        """
+        Test that a pluggable VerifiableConsumer module load works
+        """
+        consumer = self.setup_consumer(self.TOPIC)
+
+        for num_started, node in enumerate(consumer.nodes, 1):
+            consumer.start_node(node)
+
+        self.logger.debug("Waiting for %d nodes to start" % len(consumer.nodes))
+        wait_until(lambda: len(consumer.alive_nodes()) == len(consumer.nodes),
+                   timeout_sec=60,
+                   err_msg="Timed out waiting for consumers to start")
+        self.logger.debug("Started: %s" % str(consumer.alive_nodes()))
+        consumer.stop_all()
+
+        self.logger.debug("Waiting for %d nodes to stop" % len(consumer.nodes))
+        wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
+                   timeout_sec=self.session_timeout_sec+5,
+                   err_msg="Timed out waiting for consumers to shutdown")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 8bd6a45..5364720 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -215,6 +215,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
 
     public void run() {
         try {
+            printJson(new StartupComplete());
             consumer.subscribe(Collections.singletonList(topic), this);
 
             while (true) {
@@ -267,6 +268,14 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
         }
     }
 
+    private static class StartupComplete extends ConsumerEvent {
+
+        @Override
+        public String name() {
+            return "startup_complete";
+        }
+    }
+
     private static class ShutdownComplete extends ConsumerEvent {
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5cec01f/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index f8f7233..5ce6aba 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -251,6 +251,12 @@ public class VerifiableProducer {
         return toJsonString(data);
     }
 
+    String startupString() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("name", "startup_complete");
+        return toJsonString(data);
+    }
+
     /**
      * Return JSON string encapsulating basic information about the exception, as well
      * as the key and value which triggered the exception.
@@ -352,6 +358,7 @@ public class VerifiableProducer {
         });
 
         ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
+        System.out.println(producer.startupString());
         long maxMessages = infinite ? Long.MAX_VALUE : producer.maxMessages;
         for (long i = 0; i < maxMessages; i++) {
             if (producer.stopProducing) {


Mime
View raw message