kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-1888: rolling upgrade test
Date Tue, 27 Oct 2015 22:18:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk af42c3789 -> e6b343302


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/jmx_mixin.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/jmx_mixin.py b/tests/kafkatest/services/performance/jmx_mixin.py
deleted file mode 100644
index 7e19839..0000000
--- a/tests/kafkatest/services/performance/jmx_mixin.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-class JmxMixin(object):
-
-    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
-        self.jmx_object_names = jmx_object_names
-        self.jmx_attributes = jmx_attributes
-        self.jmx_port = 9192
-
-        self.started = [False] * num_nodes
-        self.jmx_stats = [{} for x in range(num_nodes)]
-        self.maximum_jmx_value = {}  # map from object_attribute_name to maximum value observed over time
-        self.average_jmx_value = {}  # map from object_attribute_name to average value observed over time
-
-    def clean_node(self, node):
-        node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
-
-    def start_jmx_tool(self, idx, node):
-        if self.started[idx-1] == True or self.jmx_object_names == None:
-            return
-        self.started[idx-1] = True
-
-        cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \
-              "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
-        for jmx_object_name in self.jmx_object_names:
-            cmd += " --object-name %s" % jmx_object_name
-        for jmx_attribute in self.jmx_attributes:
-            cmd += " --attributes %s" % jmx_attribute
-        cmd += " | tee -a /mnt/jmx_tool.log"
-
-        self.logger.debug("Start JmxTool %d command: %s", idx, cmd)
-        jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
-        jmx_output.next()
-
-    def read_jmx_output(self, idx, node):
-        if self.started[idx-1] == False:
-            return
-        self.maximum_jmx_value = {}
-        self.average_jmx_value = {}
-        object_attribute_names = []
-
-        cmd = "cat /mnt/jmx_tool.log"
-        self.logger.debug("Read jmx output %d command: %s", idx, cmd)
-        for line in node.account.ssh_capture(cmd, allow_fail=False):
-            if "time" in line:
-                object_attribute_names = line.strip()[1:-1].split("\",\"")[1:]
-                continue
-            stats = [float(field) for field in line.split(',')]
-            time_sec = int(stats[0]/1000)
-            self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
-
-        # do not calculate average and maximum of jmx stats until we have read output from all nodes
-        if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats):
-            return
-
-        start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
-        end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
-
-        for name in object_attribute_names:
-            aggregates_per_time = []
-            for time_sec in xrange(start_time_sec, end_time_sec+1):
-                # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
-                values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
-                # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
-                aggregates_per_time.append(sum(values_per_node))
-            self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time)
-            self.maximum_jmx_value[name] = max(aggregates_per_time)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 25911af..401d6f7 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.performance.jmx_mixin import JmxMixin
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.performance import PerformanceService
 import itertools
 from kafkatest.utils.security_config import SecurityConfig
+from kafkatest.services.kafka.directory import kafka_dir
 
 class ProducerPerformanceService(JmxMixin, PerformanceService):
 
@@ -45,8 +46,13 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
 
     def _worker(self, idx, node):
         args = self.args.copy()
-        args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id})
-        cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
+        args.update({
+            'bootstrap_servers': self.kafka.bootstrap_servers(),
+            'jmx_port': self.jmx_port,
+            'client_id': self.client_id,
+            'kafka_directory': kafka_dir(node)
+            })
+        cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
               "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)
@@ -73,19 +79,21 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             }
         last = None
         producer_output = node.account.ssh_capture(cmd)
-        first_line = producer_output.next()
-        self.start_jmx_tool(idx, node)
-        for line in itertools.chain([first_line], producer_output):
-            if self.intermediate_stats:
-                try:
-                    self.stats[idx-1].append(parse_stats(line))
-                except:
-                    # Sometimes there are extraneous log messages
-                    pass
+        first_line = next(producer_output, None)
 
-            last = line
-        try:
-            self.results[idx-1] = parse_stats(last)
-        except:
-            raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
-        self.read_jmx_output(idx, node)
+        if first_line is not None:
+            self.start_jmx_tool(idx, node)
+            for line in itertools.chain([first_line], producer_output):
+                if self.intermediate_stats:
+                    try:
+                        self.stats[idx-1].append(parse_stats(line))
+                    except:
+                        # Sometimes there are extraneous log messages
+                        pass
+
+                last = line
+            try:
+                self.results[idx-1] = parse_stats(last)
+            except:
+                raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
+            self.read_jmx_output(idx, node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties
deleted file mode 100644
index a7f6604..0000000
--- a/tests/kafkatest/services/templates/kafka.properties
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-
-broker.id={{ broker_id }}
-port=9092
-#host.name=localhost
-advertised.host.name={{ node.account.hostname }}
-#advertised.port=<port accessible by clients>
-{% if security_protocol == interbroker_security_protocol %}
-listeners={{ security_protocol }}://:{{ port }}
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
-{% else %}
-listeners=PLAINTEXT://:9092,SSL://:9093
-advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
-{% endif %}
-num.network.threads=3
-num.io.threads=8
-socket.send.buffer.bytes=102400
-socket.receive.buffer.bytes=65536
-socket.request.max.bytes=104857600
-
-log.dirs=/mnt/kafka-logs
-num.partitions=1
-num.recovery.threads.per.data.dir=1
-#log.flush.interval.messages=10000
-#log.flush.interval.ms=1000
-log.retention.hours=168
-#log.retention.bytes=1073741824
-log.segment.bytes=1073741824
-log.retention.check.interval.ms=300000
-log.cleaner.enable=false
-
-zookeeper.connect={{ zk.connect_setting() }}
-zookeeper.connection.timeout.ms=2000
-
-{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %}
-quota.producer.default={{ quota_config.quota_producer_default }}
-{% endif %}
-
-{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default is not none %}
-quota.consumer.default={{ quota_config.quota_consumer_default }}
-{% endif %}
-
-{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides is not none %}
-quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides }}
-{% endif %}
-
-{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides is not none %}
-quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
-{% endif %}
-
-security.inter.broker.protocol={{ interbroker_security_protocol }}
-ssl.keystore.location=/mnt/ssl/test.keystore.jks
-ssl.keystore.password=test-ks-passwd
-ssl.key.password=test-key-passwd
-ssl.keystore.type=JKS
-ssl.truststore.location=/mnt/ssl/test.truststore.jks
-ssl.truststore.password=test-ts-passwd
-ssl.truststore.type=JKS
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 7ae7988..a95a0d6 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -14,28 +14,49 @@
 # limitations under the License.
 
 from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
 from kafkatest.utils.security_config import SecurityConfig
 
 import json
+import os
+import subprocess
+import time
 
 
 class VerifiableProducer(BackgroundThreadService):
+    PERSISTENT_ROOT = "/mnt/verifiable_producer"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.properties")
 
-    CONFIG_FILE = "/mnt/verifiable_producer.properties"
     logs = {
-        "producer_log": {
-            "path": "/mnt/producer.log",
-            "collect_default": False}
-    }
-
-    def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000):
+        "verifiable_producer_stdout": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": False},
+        "verifiable_producer_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": False},
+        "verifiable_producer_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+        }
+
+    def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, max_messages=-1, throughput=100000, version=TRUNK):
         super(VerifiableProducer, self).__init__(context, num_nodes)
+        self.log_level = "TRACE"
 
         self.kafka = kafka
         self.topic = topic
         self.max_messages = max_messages
         self.throughput = throughput
 
+        for node in self.nodes:
+            node.version = version
         self.acked_values = []
         self.not_acked_values = []
 
@@ -45,15 +66,24 @@ class VerifiableProducer(BackgroundThreadService):
         self.prop_file += str(self.security_config)
 
     def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
+        node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
+
         # Create and upload config file
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(self.prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file)
         self.security_config.setup_node(node)
 
-        cmd = self.start_cmd
+        cmd = self.start_cmd(node)
         self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
 
+
+        last_produced_time = time.time()
+        prev_msg = None
         for line in node.account.ssh_capture(cmd):
             line = line.strip()
 
@@ -68,9 +98,30 @@ class VerifiableProducer(BackgroundThreadService):
                     elif data["name"] == "producer_send_success":
                         self.acked_values.append(int(data["value"]))
 
-    @property
-    def start_cmd(self):
-        cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
+                        # Log information if there is a large gap between successively acknowledged messages
+                        t = time.time()
+                        time_delta_sec = t - last_produced_time
+                        if time_delta_sec > 2 and prev_msg is not None:
+                            self.logger.debug(
+                                "Time delta between successively acked messages is large: " +
+                                "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
+
+                        last_produced_time = t
+                        prev_msg = data
+
+    def start_cmd(self, node):
+
+        cmd = ""
+        if node.version <= LATEST_0_8_2:
+            # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
+            # the tools jar from trunk to the classpath
+            cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            cmd += "export CLASSPATH; "
+
+        cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
+        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
               " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
@@ -78,9 +129,20 @@ class VerifiableProducer(BackgroundThreadService):
             cmd += " --throughput %s" % str(self.throughput)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
-        cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
+        cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
+    def pids(self, node):
+        try:
+            cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
     @property
     def acked(self):
         with self.lock:
@@ -113,7 +175,7 @@ class VerifiableProducer(BackgroundThreadService):
 
     def clean_node(self, node):
         node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
-        node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
         self.security_config.clean_node(node)
 
     def try_parse_json(self, string):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 09bec35..9a9047c 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -16,6 +16,8 @@
 
 from ducktape.services.service import Service
 
+from kafkatest.services.kafka.directory import kafka_dir
+
 import subprocess
 import time
 
@@ -46,9 +48,9 @@ class ZookeeperService(Service):
         self.logger.info(config_file)
         node.account.create_file("/mnt/zookeeper.properties", config_file)
 
-        node.account.ssh(
-            "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
-            % self.logs["zk_log"])
+        start_cmd = "/opt/%s/bin/zookeeper-server-start.sh " % kafka_dir(node)
+        start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % self.logs["zk_log"]
+        node.account.ssh(start_cmd)
 
         time.sleep(5)  # give it some time to start
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
new file mode 100644
index 0000000..aa2fe53
--- /dev/null
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+
+class ProduceConsumeValidateTest(Test):
+    """This class provides a shared template for tests which follow the common pattern of:
+
+        - produce to a topic in the background
+        - consume from that topic in the background
+        - run some logic, e.g. fail topic leader etc.
+        - perform validation
+    """
+
+    def __init__(self, test_context):
+        super(ProduceConsumeValidateTest, self).__init__(test_context=test_context)
+
+    def setup_producer_and_consumer(self):
+        raise NotImplementedError("Subclasses should implement this")
+
+    def start_producer_and_consumer(self):
+        # Start background producer and consumer
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
+             err_msg="Producer failed to start in a reasonable amount of time.")
+        self.consumer.start()
+        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10,
+             err_msg="Consumer failed to start in a reasonable amount of time.")
+
+    def stop_producer_and_consumer(self):
+        for node in self.consumer.nodes:
+            if not self.consumer.alive(node):
+                self.logger.warn("Consumer on %s is not alive and probably should be." % str(node.account))
+        for node in self.producer.nodes:
+            if not self.producer.alive(node):
+                self.logger.warn("Producer on %s is not alive and probably should be." % str(node.account))
+
+        # Check that producer is still successfully producing
+        currently_acked = self.producer.num_acked
+        wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10,
+             err_msg="Expected producer to still be producing.")
+
+        self.producer.stop()
+        self.consumer.wait()
+
+    def run_produce_consume_validate(self, core_test_action):
+        """Top-level template for simple produce/consume/validate tests."""
+
+        self.start_producer_and_consumer()
+        core_test_action()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    def validate(self):
+        """Check that each acked message was consumed."""
+
+        self.acked = self.producer.acked
+        self.not_acked = self.producer.not_acked
+
+        # Check produced vs consumed
+        self.consumed = self.consumer.messages_consumed[1]
+        self.logger.info("num consumed:  %d" % len(self.consumed))
+
+        success = True
+        msg = ""
+
+        if len(set(self.consumed)) != len(self.consumed):
+            # There are duplicates. This is ok, so report it but don't fail the test
+            msg += "There are duplicate messages in the log\n"
+
+        if not set(self.consumed).issuperset(set(self.acked)):
+            # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
+            acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
+            success = False
+
+            msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: "
+            if len(acked_minus_consumed) < 20:
+                msg += str(acked_minus_consumed)
+            else:
+                for i in range(20):
+                    msg += str(acked_minus_consumed.pop()) + ", "
+                msg += "...plus " + str(len(acked_minus_consumed) - 20) + " more"
+
+        if not success:
+            # Collect all the data logs if there was a failure
+            self.mark_for_collect(self.kafka)
+
+        if not success:
+            self.mark_for_collect(self.producer)
+
+        assert success, msg
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
index 4ae2e08..6ba6aa7 100644
--- a/tests/kafkatest/tests/quota_test.py
+++ b/tests/kafkatest/tests/quota_test.py
@@ -14,18 +14,13 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from ducktape.mark import parametrize
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 
-import random
-import signal
-import time
 
 class QuotaTest(Test):
     """
@@ -73,7 +68,10 @@ class QuotaTest(Test):
         """Override this since we're adding services outside of the constructor"""
         return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    def run_clients(self, producer_id, producer_num, consumer_id, consumer_num):
+    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
+    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
+    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol,
@@ -91,7 +89,7 @@ class QuotaTest(Test):
         consumer.run()
 
         for idx, messages in consumer.messages_consumed.iteritems():
-            assert len(messages)>0, "consumer %d didn't consume any message before timeout" % idx
+            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
 
         success, msg = self.validate(self.kafka, producer, consumer)
         assert success, msg
@@ -172,9 +170,3 @@ class QuotaTest(Test):
         if client_id in overridden_quotas:
             return float(overridden_quotas[client_id])
         return self.quota_config['quota_consumer_default']
-
-    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
-    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
-        self.run_clients(producer_id, producer_num, consumer_id, consumer_num)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index d20cc22..16aa944 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -13,24 +13,76 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
+
 from ducktape.mark import matrix
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 
 import signal
-import time
 
 
-class ReplicationTest(Test):
-    """Replication tests.
-    These tests verify that replication provides simple durability guarantees by checking that data acked by
-    brokers is still available for consumption in the face of various failure scenarios."""
+def clean_shutdown(test):
+    """Discover leader node for our topic and shut it down cleanly."""
+    test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM)
+
+
+def hard_shutdown(test):
+    """Discover leader node for our topic and shut it down with a hard kill."""
+    test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL)
+
+
+def clean_bounce(test):
+    """Chase the leader of one partition and restart it cleanly."""
+    for i in range(5):
+        prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
+        test.kafka.restart_node(prev_leader_node, clean_shutdown=True)
+
+
+def hard_bounce(test):
+    """Chase the leader and restart it with a hard kill."""
+    for i in range(5):
+        prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
+        test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL)
+
+        # Since this is a hard kill, we need to make sure the process is down and that
+        # zookeeper and the broker cluster have registered the loss of the leader.
+        # Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic for this.
+
+        def leader_changed():
+            current_leader = test.kafka.leader(topic=test.topic, partition=0)
+            return current_leader is not None and current_leader != prev_leader_node
+
+        wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5)
+        wait_until(leader_changed, timeout_sec=10, backoff_sec=.5)
+        test.kafka.start_node(prev_leader_node)
+
+failures = {
+    "clean_shutdown": clean_shutdown,
+    "hard_shutdown": hard_shutdown,
+    "clean_bounce": clean_bounce,
+    "hard_bounce": hard_bounce
+}
+
+
+class ReplicationTest(ProduceConsumeValidateTest):
+    """
+    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
+    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
+    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
+    ordering guarantees.
+
+    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
+    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
+
+    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
+    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
+    indicator that nothing is left to consume.
+    """
 
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
@@ -38,6 +90,11 @@ class ReplicationTest(Test):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    "min.insync.replicas": 2}
+                                                                })
         self.producer_throughput = 10000
         self.num_producers = 1
         self.num_consumers = 1
@@ -49,125 +106,27 @@ class ReplicationTest(Test):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
 
-    def run_with_failure(self, failure, interbroker_security_protocol):
-        """This is the top-level test template.
-
-        The steps are:
-            Produce messages in the background while driving some failure condition
-            When done driving failures, immediately stop producing
-            Consume all messages
-            Validate that messages acked by brokers were consumed
 
-        Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
-        (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
-        too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
-        ordering guarantees.
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            interbroker_security_protocol=["PLAINTEXT", "SSL"])
+    def test_replication_with_broker_failure(self, failure_mode, interbroker_security_protocol="PLAINTEXT"):
+        """Replication tests.
+        These tests verify that replication provides simple durability guarantees by checking that data acked by
+        brokers is still available for consumption in the face of various failure scenarios.
 
-        Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
-        we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
-
-        Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
-        consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
-        indicator that nothing is left to consume.
+        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
 
+            - Produce messages in the background
+            - Consume messages in the background
+            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
+            - When done driving failures, stop producing, and finish consuming
+            - Validate that every acked message was consumed
         """
-        security_protocol='PLAINTEXT'
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, 
-                                  security_protocol=security_protocol,
-                                  interbroker_security_protocol=interbroker_security_protocol,
-                                  topics={self.topic: {
-                                               "partitions": 3,
-                                               "replication-factor": 3,
-                                               "min.insync.replicas": 2}
-                                         })
+        client_security_protocol = 'PLAINTEXT'
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=client_security_protocol, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=client_security_protocol, consumer_timeout_ms=60000, message_validator=is_int)
+
+        self.kafka.interbroker_security_protocol = interbroker_security_protocol
         self.kafka.start()
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int)
-
-        # Produce in a background thread while driving broker failures
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
-             err_msg="Producer failed to start in a reasonable amount of time.")
-        failure()
-        self.producer.stop()
-
-        self.acked = self.producer.acked
-        self.not_acked = self.producer.not_acked
-        self.logger.info("num not acked: %d" % self.producer.num_not_acked)
-        self.logger.info("num acked:     %d" % self.producer.num_acked)
-
-        # Consume all messages
-        self.consumer.start()
-        self.consumer.wait()
-        self.consumed = self.consumer.messages_consumed[1]
-        self.logger.info("num consumed:  %d" % len(self.consumed))
-
-        # Check produced vs consumed
-        success, msg = self.validate()
-
-        if not success:
-            self.mark_for_collect(self.producer)
-
-        assert success, msg
-
-    def clean_shutdown(self):
-        """Discover leader node for our topic and shut it down cleanly."""
-        self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
-
-    def hard_shutdown(self):
-        """Discover leader node for our topic and shut it down with a hard kill."""
-        self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
-
-    def clean_bounce(self):
-        """Chase the leader of one partition and restart it cleanly."""
-        for i in range(5):
-            prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
-            self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
-
-    def hard_bounce(self):
-        """Chase the leader and restart it cleanly."""
-        for i in range(5):
-            prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
-            self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
-
-            # Wait long enough for previous leader to probably be awake again
-            time.sleep(6)
-
-    def validate(self):
-        """Check that produced messages were consumed."""
-
-        success = True
-        msg = ""
-
-        if len(set(self.consumed)) != len(self.consumed):
-            # There are duplicates. This is ok, so report it but don't fail the test
-            msg += "There are duplicate messages in the log\n"
-
-        if not set(self.consumed).issuperset(set(self.acked)):
-            # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
-            acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
-            success = False
-            msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
-
-        if not success:
-            # Collect all the data logs if there was a failure
-            self.mark_for_collect(self.kafka)
-
-        return success, msg
-
-    
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_clean_shutdown(self, interbroker_security_protocol):
-        self.run_with_failure(self.clean_shutdown, interbroker_security_protocol)
-
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_hard_shutdown(self, interbroker_security_protocol):
-        self.run_with_failure(self.hard_shutdown, interbroker_security_protocol)
-
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_clean_bounce(self, interbroker_security_protocol):
-        self.run_with_failure(self.clean_bounce, interbroker_security_protocol)
-
-    @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
-    def test_hard_bounce(self, interbroker_security_protocol):
-        self.run_with_failure(self.hard_bounce, interbroker_security_protocol)
+        
+        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
new file mode 100644
index 0000000..97605cd
--- /dev/null
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.kafka import config_property
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+
+
+class TestUpgrade(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(TestUpgrade, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    "min.insync.replicas": 2}})
+        self.zk.start()
+        self.kafka.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput, version=LATEST_0_8_2)
+
+        # TODO - reduce the timeout
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=30000, message_validator=is_int, version=LATEST_0_8_2)
+
+    def perform_upgrade(self):
+        self.logger.info("First pass bounce - rolling upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = TRUNK
+            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
+            self.kafka.start_node(node)
+
+        self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+            self.kafka.start_node(node)
+
+    def test_upgrade(self):
+        """Test upgrade of Kafka broker cluster from 0.8.2 to 0.9.0
+
+        - Start 3 node broker cluster on version 0.8.2
+        - Start producer and consumer in the background
+        - Perform two-phase rolling upgrade
+            - First phase: upgrade brokers to 0.9.0 with inter.broker.protocol.version set to 0.8.2.X
+            - Second phase: remove inter.broker.protocol.version config with rolling bounce
+        - Finally, validate that every message acked by the producer was consumed by the consumer
+        """
+
+        self.run_produce_consume_validate(core_test_action=self.perform_upgrade)
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index cff6d2b..46c71f0 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -12,4 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
\ No newline at end of file
+# see kafka.server.KafkaConfig for additional details and defaults
+
+from util import kafkatest_version, is_version
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
new file mode 100644
index 0000000..2b1e49c
--- /dev/null
+++ b/tests/kafkatest/utils/util.py
@@ -0,0 +1,42 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest import __version__ as __kafkatest_version__
+
+import re
+
+
+def kafkatest_version():
+    """Return string representation of current ducktape version."""
+    return __kafkatest_version__
+
+
+def _kafka_jar_versions(proc_string):
+    """Use a rough heuristic to find all kafka versions explicitly in the process classpath"""
+    versions = re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)
+    versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string))
+
+    return set(versions)
+
+
+def is_version(node, version_list, proc_grep_string="kafka"):
+    """Heuristic to check that only the specified version appears in the classpath of the process
+    A useful tool to aid in checking that service version apis are working correctly.
+    """
+    lines = [l for l in node.account.ssh_capture("ps ax | grep %s | grep -v grep" % proc_grep_string)]
+    assert len(lines) == 1
+
+    versions = _kafka_jar_versions(lines[0])
+    return versions == {str(v) for v in version_list}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index d637eb8..f555fd3 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -14,15 +14,21 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
+import re
 from setuptools import find_packages, setup
 
+version = ''
+with open('kafkatest/__init__.py', 'r') as fd:
+    version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
+                        fd.read(), re.MULTILINE).group(1)
+
 setup(name="kafkatest",
-      version="0.9.0.dev0",
+      version=version,
       description="Apache Kafka System Tests",
       author="Apache Kafka",
       platforms=["any"], 
       license="apache2.0",
       packages=find_packages(),
       include_package_data=True,
-      install_requires=["ducktape==0.3.2"]
+      install_requires=["ducktape==0.3.8"]
       )

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index dd695cf..0cd90c0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -23,12 +23,14 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.utils.Utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -138,7 +140,29 @@ public class VerifiableProducer {
 
         return parser;
     }
-
+    
+    /**
+     * Read a properties file from the given path
+     * @param filename The path of the file to read
+     *                 
+     * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate 
+     * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests, 
+     * we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars.
+     * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
+     */
+    public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
+        Properties props = new Properties();
+        InputStream propStream = null;
+        try {
+            propStream = new FileInputStream(filename);
+            props.load(propStream);
+        } finally {
+            if (propStream != null)
+                propStream.close();
+        }
+        return props;
+    }
+    
     /** Construct a VerifiableProducer object from command-line arguments. */
     public static VerifiableProducer createFromArgs(String[] args) {
         ArgumentParser parser = argParser();
@@ -164,7 +188,7 @@ public class VerifiableProducer {
             producerProps.put("retries", "0");
             if (configFile != null) {
                 try {
-                    producerProps.putAll(Utils.loadProps(configFile));
+                    producerProps.putAll(loadProps(configFile));
                 } catch (IOException e) {
                     throw new ArgumentParserException(e.getMessage(), parser);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 133f10a..2c2e5c2 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -38,9 +38,31 @@ if [ -z `which javac` ]; then
 fi
 
 chmod a+rw /opt
-if [ ! -e /opt/kafka ]; then
-    ln -s /vagrant /opt/kafka
+if [ -h /opt/kafka-trunk ]; then
+    # reset symlink
+    rm /opt/kafka-trunk
 fi
+ln -s /vagrant /opt/kafka-trunk
+
+get_kafka() {
+    version=$1
+
+    kafka_dir=/opt/kafka-$version
+    url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+    if [ ! -d /opt/kafka-$version ]; then
+        pushd /tmp
+        curl -O $url
+        file_tgz=`basename $url`
+        tar -xzf $file_tgz
+        rm -rf $file_tgz
+
+        file=`basename $file_tgz .tgz`
+        mv $file $kafka_dir
+        popd
+    fi
+}
+
+get_kafka 0.8.2.2
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
 # VMs, we can just create it if it doesn't exist and use it like we'd use


Mime
View raw message