kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2274: verifiable consumer and integration testing
Date Tue, 10 Nov 2015 02:32:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c9264b4c8 -> bce664b42


KAFKA-2274: verifiable consumer and integration testing

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang, Geoff Anderson

Closes #465 from hachikuji/KAFKA-2274


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bce664b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bce664b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bce664b4

Branch: refs/heads/trunk
Commit: bce664b42a0c414dc19e2c07406cf6f14890cbd1
Parents: c9264b4
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Nov 9 18:38:22 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 9 18:38:22 2015 -0800

----------------------------------------------------------------------
 bin/kafka-verifiable-consumer.sh                |  20 +
 checkstyle/import-control.xml                   |   1 +
 .../kafka/clients/consumer/ConsumerRecords.java |  24 +-
 .../clients/consumer/internals/Fetcher.java     |   1 -
 .../kafka/coordinator/GroupCoordinator.scala    |   1 +
 tests/kafkatest/services/kafka/__init__.py      |   1 +
 tests/kafkatest/services/kafka/util.py          |  18 +
 tests/kafkatest/services/verifiable_consumer.py | 222 +++++++
 tests/kafkatest/services/verifiable_producer.py |  15 +-
 tests/kafkatest/tests/consumer_test.py          | 157 +++++
 .../apache/kafka/tools/VerifiableConsumer.java  | 611 +++++++++++++++++++
 11 files changed, 1059 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/bin/kafka-verifiable-consumer.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-verifiable-consumer.sh b/bin/kafka-verifiable-consumer.sh
new file mode 100755
index 0000000..fae064e
--- /dev/null
+++ b/bin/kafka-verifiable-consumer.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 95ea3b7..908fd35 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -108,6 +108,7 @@
   </subpackage>
 
   <subpackage name="tools">
+    <allow pkg="org.apache.kafka.common"/>
     <allow pkg="org.apache.kafka.clients.producer" />
     <allow pkg="org.apache.kafka.clients.consumer" />
     <allow pkg="com.fasterxml.jackson" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 8e6fef4..8ee9be2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
@@ -27,8 +28,7 @@ import java.util.Map;
  * partition returned by a {@link Consumer#poll(long)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-    public static final ConsumerRecords<Object, Object> EMPTY =
-            new ConsumerRecords<Object, Object>(Collections.EMPTY_MAP);
+    public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP);
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
 
@@ -41,12 +41,12 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
      * 
      * @param partition The partition to get records for
      */
-    public Iterable<ConsumerRecord<K, V>> records(TopicPartition partition) {
+    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
         List<ConsumerRecord<K, V>> recs = this.records.get(partition);
         if (recs == null)
             return Collections.emptyList();
         else
-            return recs;
+            return Collections.unmodifiableList(recs);
     }
 
     /**
@@ -55,19 +55,27 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
     public Iterable<ConsumerRecord<K, V>> records(String topic) {
         if (topic == null)
             throw new IllegalArgumentException("Topic must be non-null.");
-        List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
+        List<List<ConsumerRecord<K, V>>> recs = new ArrayList<>();
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
             if (entry.getKey().topic().equals(topic))
                 recs.add(entry.getValue());
         }
-        return new ConcatenatedIterable<K, V>(recs);
+        return new ConcatenatedIterable<>(recs);
+    }
+
+    /**
+     * Get the partitions which have records contained in this record set.
+     * @return the set of partitions with data in this record set (may be empty if no data was returned)
+     */
+    public Set<TopicPartition> partitions() {
+        return Collections.unmodifiableSet(records.keySet());
     }
 
     @Override
     public Iterator<ConsumerRecord<K, V>> iterator() {
-        return new ConcatenatedIterable<K, V>(records.values()).iterator();
+        return new ConcatenatedIterable<>(records.values()).iterator();
     }
-    
+
     /**
      * The number of records for all topics
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5907aca..4f0fbed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -596,7 +596,6 @@ public class Fetcher<K, V> {
         } catch (RuntimeException e) {
             throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
         }
-
     }
 
     private static class PartitionRecords<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 2acc223..e9c3c01 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -273,6 +273,7 @@ class GroupCoordinator(val brokerId: Int,
 
             // if this is the leader, then we can attempt to persist state and transition to stable
             if (memberId == group.leaderId) {
+              info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
 
               // fill any missing members with an empty assignment
               val missing = group.allMembers -- groupAssignment.keySet

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/tests/kafkatest/services/kafka/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py
index 6408b59..cd29e4b 100644
--- a/tests/kafkatest/services/kafka/__init__.py
+++ b/tests/kafkatest/services/kafka/__init__.py
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 from kafka import KafkaService
+from util import TopicPartition

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/tests/kafkatest/services/kafka/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/util.py b/tests/kafkatest/services/kafka/util.py
new file mode 100644
index 0000000..983f588
--- /dev/null
+++ b/tests/kafkatest/services/kafka/util.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import namedtuple
+
+TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
new file mode 100644
index 0000000..7d76166
--- /dev/null
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -0,0 +1,222 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.version import TRUNK
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.kafka import TopicPartition
+
+from collections import namedtuple
+import json
+import os
+import subprocess
+import time
+import signal
+
+class VerifiableConsumer(BackgroundThreadService):
+    PERSISTENT_ROOT = "/mnt/verifiable_consumer"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "verifiable_consumer.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.properties")
+
+    logs = {
+        "verifiable_consumer_stdout": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": False},
+        "verifiable_consumer_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": False},
+        "verifiable_consumer_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+        }
+
+    def __init__(self, context, num_nodes, kafka, topic, group_id,
+                 max_messages=-1, session_timeout=30000, version=TRUNK):
+        super(VerifiableConsumer, self).__init__(context, num_nodes)
+        self.log_level = "TRACE"
+        
+        self.kafka = kafka
+        self.topic = topic
+        self.group_id = group_id
+        self.max_messages = max_messages
+        self.session_timeout = session_timeout
+
+        self.assignment = {}
+        self.joined = set()
+        self.total_records = 0
+        self.consumed_positions = {}
+        self.committed_offsets = {}
+        self.revoked_count = 0
+        self.assigned_count = 0
+
+        for node in self.nodes:
+            node.version = version
+
+        self.prop_file = ""
+        self.security_config = kafka.security_config.client_config(self.prop_file)
+        self.prop_file += str(self.security_config)
+
+    def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties', log_file=VerifiableConsumer.LOG_FILE)
+        node.account.create_file(VerifiableConsumer.LOG4J_CONFIG, log_config)
+
+        # Create and upload config file
+        self.logger.info("verifiable_consumer.properties:")
+        self.logger.info(self.prop_file)
+        node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)
+        self.security_config.setup_node(node)
+
+        cmd = self.start_cmd(node)
+        self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
+
+        for line in node.account.ssh_capture(cmd):
+            event = self.try_parse_json(line.strip())
+            if event is not None:
+                with self.lock:
+                    name = event["name"]
+                    if name == "shutdown_complete":
+                        self._handle_shutdown_complete(node)
+                    if name == "offsets_committed":
+                        self._handle_offsets_committed(node, event)
+                    elif name == "records_consumed":
+                        self._handle_records_consumed(node, event)
+                    elif name == "partitions_revoked":
+                        self._handle_partitions_revoked(node, event)
+                    elif name == "partitions_assigned":
+                        self._handle_partitions_assigned(node, event)
+
+    def _handle_shutdown_complete(self, node):
+        if node in self.joined:
+            self.joined.remove(node)
+
+    def _handle_offsets_committed(self, node, event):
+        if event["success"]:
+            for offset_commit in event["offsets"]:
+                topic = offset_commit["topic"]
+                partition = offset_commit["partition"]
+                tp = TopicPartition(topic, partition)
+                self.committed_offsets[tp] = offset_commit["offset"]
+
+    def _handle_records_consumed(self, node, event):
+        for topic_partition in event["partitions"]:
+            topic = topic_partition["topic"]
+            partition = topic_partition["partition"]
+            tp = TopicPartition(topic, partition)
+            self.consumed_positions[tp] = topic_partition["maxOffset"] + 1
+        self.total_records += event["count"]
+
+    def _handle_partitions_revoked(self, node, event):
+        self.revoked_count += 1
+        self.assignment[node] = []
+        if node in self.joined:
+            self.joined.remove(node)
+
+    def _handle_partitions_assigned(self, node, event):
+        self.assigned_count += 1
+        self.joined.add(node)
+        assignment =[]
+        for topic_partition in event["partitions"]:
+            topic = topic_partition["topic"]
+            partition = topic_partition["partition"]
+            assignment.append(TopicPartition(topic, partition))
+        self.assignment[node] = assignment
+
+    def start_cmd(self, node):
+        cmd = ""
+        cmd += "export LOG_DIR=%s;" % VerifiableConsumer.LOG_DIR
+        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
+        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
+              " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
+              (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout)
+        if self.max_messages > 0:
+            cmd += " --max-messages %s" % str(self.max_messages)
+
+        cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
+        cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
+        print(cmd)
+        return cmd
+
+    def pids(self, node):
+        try:
+            cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def try_parse_json(self, string):
+        """Try to parse a string as json. Return None if not parseable."""
+        try:
+            return json.loads(string)
+        except ValueError:
+            self.logger.debug("Could not parse as json: %s" % str(string))
+            return None
+
+    def kill_node(self, node, clean_shutdown=True, allow_fail=False):
+        if clean_shutdown:
+            sig = signal.SIGTERM
+        else:
+            sig = signal.SIGKILL
+        for pid in self.pids(node):
+            node.account.signal(pid, sig, allow_fail)
+
+        if not clean_shutdown:
+            self._handle_shutdown_complete(node)
+
+    def stop_node(self, node, clean_shutdown=True, allow_fail=False):
+        self.kill_node(node, clean_shutdown, allow_fail)
+        
+        if self.worker_threads is None:
+            return
+
+        # block until the corresponding thread exits
+        if len(self.worker_threads) >= self.idx(node):
+            # Need to guard this because stop is preemptively called before the worker threads are added and started
+            self.worker_threads[self.idx(node) - 1].join()
+
+    def clean_node(self, node):
+        self.kill_node(node, clean_shutdown=False)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        self.security_config.clean_node(node)
+
+    def current_assignment(self):
+        with self.lock:
+            return self.assignment
+
+    def position(self, tp):
+        with self.lock:
+            return self.consumed_positions[tp]
+
+    def owner(self, tp):
+        with self.lock:
+            for node, assignment in self.assignment.iteritems():
+                if tp in assignment:
+                    return node
+            return None
+
+    def committed(self, tp):
+        with self.lock:
+            return self.committed_offsets[tp]
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index bda6af2..9985bbd 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -21,6 +21,7 @@ from kafkatest.services.security.security_config import SecurityConfig
 
 import json
 import os
+import signal
 import subprocess
 import time
 
@@ -128,9 +129,17 @@ class VerifiableProducer(BackgroundThreadService):
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
+    def kill_node(self, node, clean_shutdown=True, allow_fail=False):
+        if clean_shutdown:
+            sig = signal.SIGTERM
+        else:
+            sig = signal.SIGKILL
+        for pid in self.pids(node):
+            node.account.signal(pid, sig, allow_fail)
+
     def pids(self, node):
         try:
-            cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'"
+            cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
             return pid_arr
         except (subprocess.CalledProcessError, ValueError) as e:
@@ -160,7 +169,7 @@ class VerifiableProducer(BackgroundThreadService):
             return len(self.not_acked_values)
 
     def stop_node(self, node):
-        node.account.kill_process("VerifiableProducer", allow_fail=False)
+        self.kill_node(node, clean_shutdown=False, allow_fail=False)
         if self.worker_threads is None:
             return
 
@@ -170,7 +179,7 @@ class VerifiableProducer(BackgroundThreadService):
             self.worker_threads[self.idx(node) - 1].join()
 
     def clean_node(self, node):
-        node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
+        self.kill_node(node, clean_shutdown=False, allow_fail=False)
         node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/tests/kafkatest/tests/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py
new file mode 100644
index 0000000..707ad2f
--- /dev/null
+++ b/tests/kafkatest/tests/consumer_test.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.kafka import TopicPartition
+
+def partitions_for(topic, num_partitions):
+    partitions = set()
+    for i in range(num_partitions):
+        partitions.add(TopicPartition(topic=topic, partition=i))
+    return partitions
+
+
+class VerifiableConsumerTest(KafkaTest):
+
+    STOPIC = "simple_topic"
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 3
+    PARTITIONS = partitions_for(TOPIC, NUM_PARTITIONS)
+    GROUP_ID = "test_group_id"
+
+    def __init__(self, test_context):
+        super(VerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
+            self.STOPIC : { 'partitions': 1, 'replication-factor': 2 }
+        })
+        self.num_producers = 1
+        self.num_consumers = 2
+        self.session_timeout = 10000
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers
+
+    def _partitions(self, assignment):
+        partitions = []
+        for parts in assignment.itervalues():
+            partitions += parts
+        return partitions
+
+    def _valid_assignment(self, assignment):
+        partitions = self._partitions(assignment)
+        return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS
+
+    def _setup_consumer(self, topic):
+        return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
+                                  topic, self.GROUP_ID, session_timeout=self.session_timeout)
+
+    def _setup_producer(self, topic, max_messages=-1):
+        return VerifiableProducer(self.test_context, self.num_producers,
+                                  self.kafka, topic, max_messages=max_messages)
+
+    def _await_all_members(self, consumer):
+        # Wait until all members have joined the group
+        wait_until(lambda: len(consumer.joined) == self.num_consumers, timeout_sec=20,
+                   err_msg="Consumers failed to join in a reasonable amount of time")
+
+    def test_consumer_failure(self):
+        partition = TopicPartition(self.STOPIC, 0)
+        
+        consumer = self._setup_consumer(self.STOPIC)
+        producer = self._setup_producer(self.STOPIC)
+
+        consumer.start()
+        self._await_all_members(consumer)
+
+        partition_owner = consumer.owner(partition)
+        assert partition_owner is not None
+
+        # startup the producer and ensure that some records have been written
+        producer.start()
+        wait_until(lambda: producer.num_acked > 1000, timeout_sec=20,
+                   err_msg="Producer failed waiting for messages to be written")
+
+        # stop the partition owner and await its shutdown
+        consumer.kill_node(partition_owner, clean_shutdown=True)
+        wait_until(lambda: len(consumer.joined) == 1, timeout_sec=20,
+                   err_msg="Timed out waiting for consumer to close")
+
+        # ensure that the remaining consumer does some work after rebalancing
+        current_total_records = consumer.total_records
+        wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
+                   err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
+
+        # if the total records consumed matches the current position,
+        # we haven't seen any duplicates
+        assert consumer.position(partition) == consumer.total_records
+        assert consumer.committed(partition) <= consumer.total_records
+
+    def test_broker_failure(self):
+        partition = TopicPartition(self.STOPIC, 0)
+        
+        consumer = self._setup_consumer(self.STOPIC)
+        producer = self._setup_producer(self.STOPIC)
+
+        producer.start()
+        consumer.start()
+        self._await_all_members(consumer)
+
+        # shutdown one of the brokers
+        self.kafka.signal_node(self.kafka.nodes[0])
+
+        # ensure that the remaining consumer does some work after broker failure
+        current_total_records = consumer.total_records
+        wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
+                   err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
+
+        # if the total records consumed matches the current position,
+        # we haven't seen any duplicates
+        assert consumer.position(partition) == consumer.total_records
+        assert consumer.committed(partition) <= consumer.total_records
+
+    def test_simple_consume(self):
+        total_records = 1000
+
+        consumer = self._setup_consumer(self.STOPIC)
+        producer = self._setup_producer(self.STOPIC, max_messages=total_records)
+
+        partition = TopicPartition(self.STOPIC, 0)
+
+        consumer.start()
+        self._await_all_members(consumer)
+
+        producer.start()
+        wait_until(lambda: producer.num_acked == total_records, timeout_sec=20,
+                   err_msg="Producer failed waiting for messages to be written")
+
+        wait_until(lambda: consumer.committed(partition) == total_records, timeout_sec=10,
+                   err_msg="Consumer failed to read all expected messages")
+
+        assert consumer.position(partition) == total_records
+
+    def test_valid_assignment(self):
+        consumer = self._setup_consumer(self.TOPIC)
+        consumer.start()
+        self._await_all_members(consumer)
+        assert self._valid_assignment(consumer.current_assignment())
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/bce664b4/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
new file mode 100644
index 0000000..93c0bc6
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -0,0 +1,611 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+
+/**
+ * Command line consumer designed for system testing. It outputs consumer events to STDOUT as JSON
+ * formatted objects. The "name" field in each JSON event identifies the event type. The following
+ * events are currently supported:
+ *
+ * <ul>
+ * <li>partitions_revoked: outputs the partitions revoked through {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}.
+ *     See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}</li>
+ * <li>partitions_assigned: outputs the partitions assigned through {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
+ *     See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsAssigned}.</li>
+ * <li>records_consumed: contains a summary of records consumed in a single call to {@link KafkaConsumer#poll(long)}.
+ *     See {@link org.apache.kafka.tools.VerifiableConsumer.RecordsConsumed}.</li>
+ * <li>record_data: contains the key, value, and offset of an individual consumed record (only included if verbose
+ *     output is enabled). See {@link org.apache.kafka.tools.VerifiableConsumer.RecordData}.</li>
+ * <li>offsets_committed: The result of every offset commit (only included if auto-commit is not enabled).
+ *     See {@link org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}</li>
+ * <li>shutdown_complete: emitted after the consumer returns from {@link KafkaConsumer#close()}.
+ *     See {@link org.apache.kafka.tools.VerifiableConsumer.ShutdownComplete}.</li>
+ * </ul>
+ */
+public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener {
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final PrintStream out;
+    private final KafkaConsumer<String, String> consumer;
+    private final String topic;
+    private final boolean useAutoCommit;
+    private final boolean useAsyncCommit;
+    private final boolean verbose;
+    private final int maxMessages;
+    private int consumedMessages = 0;
+
+    private CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    public VerifiableConsumer(KafkaConsumer<String, String> consumer,
+                              PrintStream out,
+                              String topic,
+                              int maxMessages,
+                              boolean useAutoCommit,
+                              boolean useAsyncCommit,
+                              boolean verbose) {
+        this.consumer = consumer;
+        this.out = out;
+        this.topic = topic;
+        this.maxMessages = maxMessages;
+        this.useAutoCommit = useAutoCommit;
+        this.useAsyncCommit = useAsyncCommit;
+        this.verbose = verbose;
+        addKafkaSerializerModule();
+    }
+
+    private void addKafkaSerializerModule() {
+        SimpleModule kafka = new SimpleModule();
+        kafka.addSerializer(TopicPartition.class, new JsonSerializer<TopicPartition>() {
+            @Override
+            public void serialize(TopicPartition tp, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+                gen.writeStartObject();
+                gen.writeObjectField("topic", tp.topic());
+                gen.writeObjectField("partition", tp.partition());
+                gen.writeEndObject();
+            }
+        });
+        mapper.registerModule(kafka);
+    }
+
+    private boolean hasMessageLimit() {
+        return maxMessages >= 0;
+    }
+
+    private boolean isFinished() {
+        return hasMessageLimit() && consumedMessages >= maxMessages;
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> onRecordsReceived(ConsumerRecords<String, String> records) {
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+
+        List<RecordSetSummary> summaries = new ArrayList<>();
+        for (TopicPartition tp : records.partitions()) {
+            List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
+
+            if (hasMessageLimit() && consumedMessages + partitionRecords.size() > maxMessages)
+                partitionRecords = partitionRecords.subList(0, maxMessages - consumedMessages);
+
+            if (partitionRecords.isEmpty())
+                continue;
+
+            long minOffset = partitionRecords.get(0).offset();
+            long maxOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+
+            offsets.put(tp, new OffsetAndMetadata(maxOffset + 1));
+            summaries.add(new RecordSetSummary(tp.topic(), tp.partition(),
+                    partitionRecords.size(), minOffset, maxOffset));
+
+            if (verbose) {
+                for (ConsumerRecord<String, String> record : partitionRecords)
+                    printJson(new RecordData(record));
+            }
+
+            consumedMessages += partitionRecords.size();
+            if (isFinished())
+                break;
+        }
+
+        printJson(new RecordsConsumed(records.count(), summaries));
+        return offsets;
+    }
+
+    @Override
+    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+        List<CommitData> committedOffsets = new ArrayList<>();
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
+            TopicPartition tp = offsetEntry.getKey();
+            committedOffsets.add(new CommitData(tp.topic(), tp.partition(), offsetEntry.getValue().offset()));
+        }
+
+        boolean success = true;
+        String error = null;
+        if (exception != null) {
+            success = false;
+            error = exception.getMessage();
+        }
+        printJson(new OffsetsCommitted(committedOffsets, error, success));
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        printJson(new PartitionsAssigned(partitions));
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        printJson(new PartitionsRevoked(partitions));
+    }
+
+    private void printJson(Object data) {
+        try {
+            out.println(mapper.writeValueAsString(data));
+        } catch (JsonProcessingException e) {
+            out.println("Bad data can't be written as json: " + e.getMessage());
+        }
+    }
+
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        try {
+            consumer.commitSync(offsets);
+            onComplete(offsets, null);
+        } catch (WakeupException e) {
+            // we only call wakeup() once to close the consumer, so this recursion should be safe
+            commitSync(offsets);
+            throw e;
+        } catch (Exception e) {
+            onComplete(offsets, e);
+        }
+    }
+
+    public void run() {
+        try {
+            consumer.subscribe(Arrays.asList(topic), this);
+
+            while (true) {
+                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
+                Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);
+
+                if (!useAutoCommit) {
+                    if (useAsyncCommit)
+                        consumer.commitAsync(offsets, this);
+                    else
+                        commitSync(offsets);
+                }
+            }
+        } catch (WakeupException e) {
+            // ignore, we are closing
+        } finally {
+            consumer.close();
+            printJson(new ShutdownComplete());
+            shutdownLatch.countDown();
+        }
+    }
+
+    public void close() {
+        boolean interrupted = false;
+        try {
+            consumer.wakeup();
+            while (true) {
+                try {
+                    shutdownLatch.await();
+                    return;
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+        }
+    }
+
+    private static abstract class ConsumerEvent {
+        @JsonProperty
+        public abstract String name();
+
+        @JsonProperty("class")
+        public String clazz() {
+            return VerifiableConsumer.class.getName();
+        }
+    }
+
+    private static class ShutdownComplete extends ConsumerEvent {
+
+        @Override
+        public String name() {
+            return "shutdown_complete";
+        }
+    }
+
+    private static class PartitionsRevoked extends ConsumerEvent {
+        private final Collection<TopicPartition> partitions;
+
+        public PartitionsRevoked(Collection<TopicPartition> partitions) {
+            this.partitions = partitions;
+        }
+
+        @JsonProperty
+        public Collection<TopicPartition> partitions() {
+            return partitions;
+        }
+
+        @Override
+        public String name() {
+            return "partitions_revoked";
+        }
+    }
+
+    private static class PartitionsAssigned extends ConsumerEvent {
+        private final Collection<TopicPartition> partitions;
+
+        public PartitionsAssigned(Collection<TopicPartition> partitions) {
+            this.partitions = partitions;
+        }
+
+        @JsonProperty
+        public Collection<TopicPartition> partitions() {
+            return partitions;
+        }
+
+        @Override
+        public String name() {
+            return "partitions_assigned";
+        }
+    }
+
+    public static class RecordsConsumed extends ConsumerEvent {
+        private final long count;
+        private final List<RecordSetSummary> partitionSummaries;
+
+        public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) {
+            this.count = count;
+            this.partitionSummaries = partitionSummaries;
+        }
+
+        @Override
+        public String name() {
+            return "records_consumed";
+        }
+
+        @JsonProperty
+        public long count() {
+            return count;
+        }
+
+        @JsonProperty
+        public List<RecordSetSummary> partitions() {
+            return partitionSummaries;
+        }
+    }
+
+    public static class RecordData extends ConsumerEvent {
+
+        private final ConsumerRecord<String, String> record;
+
+        public RecordData(ConsumerRecord<String, String> record) {
+            this.record = record;
+        }
+
+        @Override
+        public String name() {
+            return "record_data";
+        }
+
+        @JsonProperty
+        public String topic() {
+            return record.topic();
+        }
+
+        @JsonProperty
+        public int partition() {
+            return record.partition();
+        }
+
+        @JsonProperty
+        public String key() {
+            return record.key();
+        }
+
+        @JsonProperty
+        public String value() {
+            return record.value();
+        }
+
+        @JsonProperty
+        public long offset() {
+            return record.offset();
+        }
+
+    }
+
+    private static class PartitionData {
+        private final String topic;
+        private final int partition;
+
+        public PartitionData(String topic, int partition) {
+            this.topic = topic;
+            this.partition = partition;
+        }
+
+        @JsonProperty
+        public String topic() {
+            return topic;
+        }
+
+        @JsonProperty
+        public int partition() {
+            return partition;
+        }
+    }
+
+    private static class OffsetsCommitted extends ConsumerEvent {
+
+        private final List<CommitData> offsets;
+        private final String error;
+        private final boolean success;
+
+        public OffsetsCommitted(List<CommitData> offsets, String error, boolean success) {
+            this.offsets = offsets;
+            this.error = error;
+            this.success = success;
+        }
+
+        @Override
+        public String name() {
+            return "offsets_committed";
+        }
+
+        @JsonProperty
+        public List<CommitData> offsets() {
+            return offsets;
+        }
+
+        @JsonProperty
+        @JsonInclude(JsonInclude.Include.NON_NULL)
+        public String error() {
+            return error;
+        }
+
+        @JsonProperty
+        public boolean success() {
+            return success;
+        }
+
+    }
+
+    private static class CommitData extends PartitionData {
+        private final long offset;
+
+        public CommitData(String topic, int partition, long offset) {
+            super(topic, partition);
+            this.offset = offset;
+        }
+
+        @JsonProperty
+        public long offset() {
+            return offset;
+        }
+    }
+
+    private static class RecordSetSummary extends PartitionData {
+        private final long count;
+        private final long minOffset;
+        private final long maxOffset;
+
+        public RecordSetSummary(String topic, int partition, long count, long minOffset, long maxOffset) {
+            super(topic, partition);
+            this.count = count;
+            this.minOffset = minOffset;
+            this.maxOffset = maxOffset;
+        }
+
+        @JsonProperty
+        public long count() {
+            return count;
+        }
+
+        @JsonProperty
+        public long minOffset() {
+            return minOffset;
+        }
+
+        @JsonProperty
+        public long maxOffset() {
+            return maxOffset;
+        }
+
+    }
+
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("verifiable-consumer")
+                .defaultHelp(true)
+                .description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
+
+        parser.addArgument("--broker-list")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("brokerList")
+                .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+
+        parser.addArgument("--topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("TOPIC")
+                .help("Consumes messages from this topic.");
+
+        parser.addArgument("--group-id")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("GROUP_ID")
+                .dest("groupId")
+                .help("The groupId shared among members of the consumer group");
+
+        parser.addArgument("--max-messages")
+                .action(store())
+                .required(false)
+                .type(Integer.class)
+                .setDefault(-1)
+                .metavar("MAX-MESSAGES")
+                .dest("maxMessages")
+                .help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally");
+
+        parser.addArgument("--session-timeout")
+                .action(store())
+                .required(false)
+                .setDefault(30000)
+                .type(Integer.class)
+                .metavar("TIMEOUT_MS")
+                .dest("sessionTimeout")
+                .help("Set the consumer's session timeout");
+
+        parser.addArgument("--verbose")
+                .action(storeTrue())
+                .type(Boolean.class)
+                .metavar("VERBOSE")
+                .help("Enable to log individual consumed records");
+
+        parser.addArgument("--enable-autocommit")
+                .action(storeTrue())
+                .type(Boolean.class)
+                .metavar("ENABLE-AUTOCOMMIT")
+                .dest("useAutoCommit")
+                .help("Enable offset auto-commit on consumer");
+
+        parser.addArgument("--reset-policy")
+                .action(store())
+                .required(false)
+                .setDefault("earliest")
+                .type(String.class)
+                .dest("resetPolicy")
+                .help("Set reset policy (must be either 'earliest', 'latest', or 'none'");
+
+        parser.addArgument("--consumer.config")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("CONFIG_FILE")
+                .help("Consumer config properties file (config options shared with command line parameters will be overridden).");
+
+        return parser;
+    }
+
+    public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
+        Namespace res = parser.parseArgs(args);
+
+        String topic = res.getString("topic");
+        boolean useAutoCommit = res.getBoolean("useAutoCommit");
+        int maxMessages = res.getInt("maxMessages");
+        boolean verbose = res.getBoolean("verbose");
+        String configFile = res.getString("consumer.config");
+
+        Properties consumerProps = new Properties();
+        if (configFile != null) {
+            try {
+                consumerProps.putAll(Utils.loadProps(configFile));
+            } catch (IOException e) {
+                throw new ArgumentParserException(e.getMessage(), parser);
+            }
+        }
+
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
+        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
+
+        StringDeserializer deserializer = new StringDeserializer();
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
+
+        return new VerifiableConsumer(
+                consumer,
+                System.out,
+                topic,
+                maxMessages,
+                useAutoCommit,
+                false,
+                verbose);
+    }
+
+    public static void main(String[] args) {
+        ArgumentParser parser = argParser();
+        if (args.length == 0) {
+            parser.printHelp();
+            System.exit(0);
+        }
+
+        try {
+            final VerifiableConsumer consumer = createFromArgs(parser, args);
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    consumer.close();
+                }
+            });
+
+            consumer.run();
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            System.exit(1);
+        }
+    }
+
+}


Mime
View raw message