kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2276; KIP-25 initial patch
Date Wed, 29 Jul 2015 00:21:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f4101ab3f -> e43c9aff9


http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/tests/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
new file mode 100644
index 0000000..b01f27b
--- /dev/null
+++ b/tests/kafkatest/tests/benchmark_test.py
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
+
+
+class Benchmark(KafkaTest):
+    '''A benchmark of Kafka producer/consumer performance. This replicates the test
+    run here:
+    https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
+    '''
+    def __init__(self, test_context):
+        super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
+            'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
+            'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
+        })
+
+        if True:
+            # Works on both aws and local
+            self.msgs = 1000000
+            self.msgs_default = 1000000
+        else:
+            # Can use locally on Vagrant VMs, but may use too much memory for aws
+            self.msgs = 50000000
+            self.msgs_default = 50000000
+
+        self.msgs_large = 10000000
+        self.msg_size_default = 100
+        self.batch_size = 8*1024
+        self.buffer_memory = 64*1024*1024
+        self.msg_sizes = [10, 100, 1000, 10000, 100000]
+        self.target_data_size = 128*1024*1024
+        self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
+
+    def test_single_producer_no_replication(self):
+        self.logger.info("BENCHMARK: Single producer, no replication")
+        self.perf = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+        self.perf.run()
+        data = compute_throughput(self.perf)
+        self.logger.info("Single producer, no replication: %s", str(data))
+        return data
+
+    def test_single_producer_replication(self):
+        self.logger.info("BENCHMARK: Single producer, async 3x replication")
+        self.perf = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+        self.perf.run()
+        data = compute_throughput(self.perf)
+        self.logger.info("Single producer, async 3x replication: %s" % str(data))
+        return data
+
+    def test_single_producer_sync(self):
+        self.logger.info("BENCHMARK: Single producer, sync 3x replication")
+        self.perf = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+        self.perf.run()
+
+        data = compute_throughput(self.perf)
+        self.logger.info("Single producer, sync 3x replication: %s" % data)
+        return data
+
+    def test_three_producers_async(self):
+        self.logger.info("BENCHMARK: Three producers, async 3x replication")
+        self.perf = ProducerPerformanceService(
+            self.test_context, 3, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+        self.perf.run()
+
+        data = compute_throughput(self.perf)
+        self.logger.info("Three producers, async 3x replication: %s" % data)
+        return data
+
+    def test_multiple_message_size(self):
+        # TODO this would be a great place to use parametrization
+        self.perfs = {}
+        for msg_size in self.msg_sizes:
+            self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
+            # Always generate the same total amount of data
+            nrecords = int(self.target_data_size / msg_size)
+            self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
+                self.test_context, 1, self.kafka,
+                topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
+                settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
+            )
+
+        self.msg_size_perf = {}
+        for msg_size in self.msg_sizes:
+            perf = self.perfs["perf-" + str(msg_size)]
+            perf.run()
+            self.msg_size_perf[msg_size] = perf
+
+        summary = ["Message size:"]
+        data = {}
+        for msg_size in self.msg_sizes:
+            datum = compute_throughput(self.msg_size_perf[msg_size])
+            summary.append(" %d: %s" % (msg_size, datum))
+            data[msg_size] = datum
+        self.logger.info("\n".join(summary))
+        return data
+
+    def test_long_term_throughput(self):
+        self.logger.info("BENCHMARK: Long production")
+        self.perf = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
+            intermediate_stats=True
+        )
+        self.perf.run()
+
+        summary = ["Throughput over long run, data > memory:"]
+        data = {}
+        # FIXME we should be generating a graph too
+        # Try to break it into 5 blocks, but fall back to a smaller number if
+        # there aren't even 5 elements
+        block_size = max(len(self.perf.stats[0]) / 5, 1)
+        nblocks = len(self.perf.stats[0]) / block_size
+        for i in range(nblocks):
+            subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
+            if len(subset) == 0:
+                summary.append(" Time block %d: (empty)" % i)
+                data[i] = None
+            else:
+                records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
+                mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
+
+                summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
+                data[i] = throughput(records_per_sec, mb_per_sec)
+
+        self.logger.info("\n".join(summary))
+        return data
+
+    def test_end_to_end_latency(self):
+        self.logger.info("BENCHMARK: End to end latency")
+        self.perf = EndToEndLatencyService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-three", num_records=10000
+        )
+        self.perf.run()
+
+        data = latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
+        self.logger.info("End-to-end latency: %s" % str(data))
+        return data
+
+    def test_producer_and_consumer(self):
+        self.logger.info("BENCHMARK: Producer + Consumer")
+        self.producer = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+
+        self.consumer = ConsumerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
+        )
+
+        Service.run_parallel(self.producer, self.consumer)
+
+        data = {
+            "producer": compute_throughput(self.producer),
+            "consumer": compute_throughput(self.consumer)
+        }
+        summary = [
+            "Producer + consumer:",
+            str(data)]
+        self.logger.info("\n".join(summary))
+        return data
+
+    def test_single_consumer(self):
+        topic = "test-rep-three"
+
+        self.producer = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+        self.producer.run()
+
+        # All consumer tests use the messages from the first benchmark, so
+        # they'll get messages of the default message size
+        self.logger.info("BENCHMARK: Single consumer")
+        self.perf = ConsumerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
+        )
+        self.perf.run()
+
+        data = compute_throughput(self.perf)
+        self.logger.info("Single consumer: %s" % data)
+        return data
+
+    def test_three_consumers(self):
+        topic = "test-rep-three"
+
+        self.producer = ProducerPerformanceService(
+            self.test_context, 1, self.kafka,
+            topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
+            settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+        )
+        self.producer.run()
+
+        self.logger.info("BENCHMARK: Three consumers")
+        self.perf = ConsumerPerformanceService(
+            self.test_context, 3, self.kafka,
+            topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
+        )
+        self.perf.run()
+
+        data = compute_throughput(self.perf)
+        self.logger.info("Three consumers: %s", data)
+        return data
+
+
+def throughput(records_per_sec, mb_per_sec):
+    """Helper method to ensure uniform representation of throughput data"""
+    return {
+        "records_per_sec": records_per_sec,
+        "mb_per_sec": mb_per_sec
+    }
+
+
+def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
+    """Helper method to ensure uniform representation of latency data"""
+    return {
+        "latency_50th_ms": latency_50th_ms,
+        "latency_99th_ms": latency_99th_ms,
+        "latency_999th_ms": latency_999th_ms
+    }
+
+
+def compute_throughput(perf):
+    """Helper method for computing throughput after running a performance service."""
+    aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
+    aggregate_mbps = sum([r['mbps'] for r in perf.results])
+
+    return throughput(aggregate_rate, aggregate_mbps)
+
+
+
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/tests/kafka_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/kafka_test.py b/tests/kafkatest/tests/kafka_test.py
new file mode 100644
index 0000000..7118721
--- /dev/null
+++ b/tests/kafkatest/tests/kafka_test.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+
+
+class KafkaTest(Test):
+    """
+    Helper class that manages setting up a Kafka cluster. Use this if the
+    default settings for Kafka are sufficient for your test; any customization
+    needs to be done manually. Your run() method should call tearDown and
+    setUp. The Zookeeper and Kafka services are available as the fields
+    KafkaTest.zk and KafkaTest.kafka.
+    """
+    def __init__(self, test_context, num_zk, num_brokers, topics=None):
+        super(KafkaTest, self).__init__(test_context)
+        self.num_zk = num_zk
+        self.num_brokers = num_brokers
+        self.topics = topics
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+        self.kafka = KafkaService(
+            test_context, self.num_brokers,
+            self.zk, topics=self.topics)
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
new file mode 100644
index 0000000..fed1ea1
--- /dev/null
+++ b/tests/kafkatest/tests/replication_test.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+import signal
+import time
+
+
+class ReplicationTest(Test):
+    """Replication tests.
+    These tests verify that replication provides simple durability guarantees by checking that data acked by
+    brokers is still available for consumption in the face of various failure scenarios."""
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ReplicationTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    "min.insync.replicas": 2}
+                                                                })
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    def run_with_failure(self, failure):
+        """This is the top-level test template.
+
+        The steps are:
+            Produce messages in the background while driving some failure condition
+            When done driving failures, immediately stop producing
+            Consume all messages
+            Validate that messages acked by brokers were consumed
+
+        Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
+        (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
+        too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
+        ordering guarantees.
+
+        Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
+        we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
+
+        Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
+        consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
+        indicator that nothing is left to consume.
+
+        """
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000)
+
+        # Produce in a background thread while driving broker failures
+        self.producer.start()
+        if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
+            raise RuntimeError("Producer failed to start in a reasonable amount of time.")
+        failure()
+        self.producer.stop()
+
+        self.acked = self.producer.acked
+        self.not_acked = self.producer.not_acked
+        self.logger.info("num not acked: %d" % self.producer.num_not_acked)
+        self.logger.info("num acked:     %d" % self.producer.num_acked)
+
+        # Consume all messages
+        self.consumer.start()
+        self.consumer.wait()
+        self.consumed = self.consumer.messages_consumed[1]
+        self.logger.info("num consumed:  %d" % len(self.consumed))
+
+        # Check produced vs consumed
+        success, msg = self.validate()
+
+        if not success:
+            self.mark_for_collect(self.producer)
+
+        assert success, msg
+
+    def clean_shutdown(self):
+        """Discover leader node for our topic and shut it down cleanly."""
+        self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
+
+    def hard_shutdown(self):
+        """Discover leader node for our topic and shut it down with a hard kill."""
+        self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
+
+    def clean_bounce(self):
+        """Chase the leader of one partition and restart it cleanly."""
+        for i in range(5):
+            prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
+            self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
+
+    def hard_bounce(self):
+        """Chase the leader and restart it cleanly."""
+        for i in range(5):
+            prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
+            self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
+
+            # Wait long enough for previous leader to probably be awake again
+            time.sleep(6)
+
+    def validate(self):
+        """Check that produced messages were consumed."""
+
+        success = True
+        msg = ""
+
+        if len(set(self.consumed)) != len(self.consumed):
+            # There are duplicates. This is ok, so report it but don't fail the test
+            msg += "There are duplicate messages in the log\n"
+
+        if not set(self.consumed).issuperset(set(self.acked)):
+            # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
+            acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
+            success = False
+            msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
+
+        if not success:
+            # Collect all the data logs if there was a failure
+            self.mark_for_collect(self.kafka)
+
+        return success, msg
+
+    def test_clean_shutdown(self):
+        self.run_with_failure(self.clean_shutdown)
+
+    def test_hard_shutdown(self):
+        self.run_with_failure(self.hard_shutdown)
+
+    def test_clean_bounce(self):
+        self.run_with_failure(self.clean_bounce)
+
+    def test_hard_bounce(self):
+        self.run_with_failure(self.hard_bounce)
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
new file mode 100644
index 0000000..5ce4bb7
--- /dev/null
+++ b/tests/setup.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+from setuptools import find_packages, setup
+
+setup(name="kafkatest",
+      version="0.8.3-SNAPSHOT",
+      description="Apache Kafka System Tests",
+      author="Apache Kafka",
+      platforms=["any"], 
+      license="apache2.0",
+      packages=find_packages(),
+      requires=["ducktape(>=0.2.0)"]
+      )

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
new file mode 100644
index 0000000..fd31c1a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.tools;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.*;
+
+public class ProducerPerformance {
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 4) {
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
+                               " topic_name num_records record_size target_records_sec [prop_name=prop_value]*");
+            System.exit(1);
+        }
+
+        /* parse args */
+        String topicName = args[0];
+        long numRecords = Long.parseLong(args[1]);
+        int recordSize = Integer.parseInt(args[2]);
+        int throughput = Integer.parseInt(args[3]);
+
+        Properties props = new Properties();
+        for (int i = 4; i < args.length; i++) {
+            String[] pieces = args[i].split("=");
+            if (pieces.length != 2)
+                throw new IllegalArgumentException("Invalid property: " + args[i]);
+            props.put(pieces[0], pieces[1]);
+        }
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
+
+        /* setup perf test */
+        byte[] payload = new byte[recordSize];
+        Arrays.fill(payload, (byte) 1);
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
+        Stats stats = new Stats(numRecords, 5000);
+        long startMs = System.currentTimeMillis();
+
+        ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
+        for (int i = 0; i < numRecords; i++) {
+            long sendStartMs = System.currentTimeMillis();
+            Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
+            producer.send(record, cb);
+
+            if (throttler.shouldThrottle(i, sendStartMs)) {
+                throttler.throttle();
+            }
+        }
+
+        /* print final results */
+        producer.close();
+        stats.printTotal();
+    }
+
+    private static class Stats {
+        private long start;
+        private long windowStart;
+        private int[] latencies;
+        private int sampling;
+        private int iteration;
+        private int index;
+        private long count;
+        private long bytes;
+        private int maxLatency;
+        private long totalLatency;
+        private long windowCount;
+        private int windowMaxLatency;
+        private long windowTotalLatency;
+        private long windowBytes;
+        private long reportingInterval;
+
+        public Stats(long numRecords, int reportingInterval) {
+            this.start = System.currentTimeMillis();
+            this.windowStart = System.currentTimeMillis();
+            this.index = 0;
+            this.iteration = 0;
+            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
+            this.latencies = new int[(int) (numRecords / this.sampling) + 1];
+            this.index = 0;
+            this.maxLatency = 0;
+            this.totalLatency = 0;
+            this.windowCount = 0;
+            this.windowMaxLatency = 0;
+            this.windowTotalLatency = 0;
+            this.windowBytes = 0;
+            this.totalLatency = 0;
+            this.reportingInterval = reportingInterval;
+        }
+
+        public void record(int iter, int latency, int bytes, long time) {
+            this.count++;
+            this.bytes += bytes;
+            this.totalLatency += latency;
+            this.maxLatency = Math.max(this.maxLatency, latency);
+            this.windowCount++;
+            this.windowBytes += bytes;
+            this.windowTotalLatency += latency;
+            this.windowMaxLatency = Math.max(windowMaxLatency, latency);
+            if (iter % this.sampling == 0) {
+                this.latencies[index] = latency;
+                this.index++;
+            }
+            /* maybe report the recent perf */
+            if (time - windowStart >= reportingInterval) {
+                printWindow();
+                newWindow();
+            }
+        }
+
+        public Callback nextCompletion(long start, int bytes, Stats stats) {
+            Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
+            this.iteration++;
+            return cb;
+        }
+
+        public void printWindow() {
+            long ellapsed = System.currentTimeMillis() - windowStart;
+            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
+            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
+            System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n",
+                              windowCount,
+                              recsPerSec,
+                              mbPerSec,
+                              windowTotalLatency / (double) windowCount,
+                              (double) windowMaxLatency);
+        }
+
+        public void newWindow() {
+            this.windowStart = System.currentTimeMillis();
+            this.windowCount = 0;
+            this.windowMaxLatency = 0;
+            this.windowTotalLatency = 0;
+            this.windowBytes = 0;
+        }
+
+        public void printTotal() {
+            long ellapsed = System.currentTimeMillis() - start;
+            double recsPerSec = 1000.0 * count / (double) ellapsed;
+            double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
+            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
+            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
+                              count,
+                              recsPerSec,
+                              mbPerSec,
+                              totalLatency / (double) count,
+                              (double) maxLatency,
+                              percs[0],
+                              percs[1],
+                              percs[2],
+                              percs[3]);
+        }
+
+        private static int[] percentiles(int[] latencies, int count, double... percentiles) {
+            int size = Math.min(count, latencies.length);
+            Arrays.sort(latencies, 0, size);
+            int[] values = new int[percentiles.length];
+            for (int i = 0; i < percentiles.length; i++) {
+                int index = (int) (percentiles[i] * size);
+                values[i] = latencies[index];
+            }
+            return values;
+        }
+    }
+
+    private static final class PerfCallback implements Callback {
+        private final long start;
+        private final int iteration;
+        private final int bytes;
+        private final Stats stats;
+
+        public PerfCallback(int iter, long start, int bytes, Stats stats) {
+            this.start = start;
+            this.stats = stats;
+            this.iteration = iter;
+            this.bytes = bytes;
+        }
+
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            long now = System.currentTimeMillis();
+            int latency = (int) (now - start);
+            this.stats.record(iteration, latency, bytes, now);
+            if (exception != null)
+                exception.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
new file mode 100644
index 0000000..06c443f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.tools;
+
+
+/**
+ * This class helps producers throttle throughput.
+ * 
+ * If targetThroughput >= 0, the resulting average throughput will be approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur. 
+ * 
+ * To use, do this between successive send attempts:
+ * <pre>
+ *     {@code     
+ *      if (throttler.shouldThrottle(...)) {
+ *          throttler.throttle();
+ *      } 
+ *     } 
+ * </pre>
+ * 
+ * Note that this can be used to throttle message throughput or data throughput.
+ */
+public class ThroughputThrottler {
+    
+    private static final long NS_PER_MS = 1000000L;
+    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+    long sleepTimeNs;
+    long sleepDeficitNs = 0;
+    long targetThroughput = -1;
+    long startMs;
+
+    /**
+     * @param targetThroughput Can be messages/sec or bytes/sec
+     * @param startMs          When the very first message is sent
+     */
+    public ThroughputThrottler(long targetThroughput, long startMs) {
+        this.startMs = startMs;
+        this.targetThroughput = targetThroughput;
+        this.sleepTimeNs = targetThroughput > 0 ?
+                           NS_PER_SEC / targetThroughput : 
+                           Long.MAX_VALUE;
+    }
+
+    /**
+     * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
+     *                    messages produced so far if you want to throttle message throughput.
+     * @param sendStartMs timestamp of the most recently sent message
+     * @return
+     */
+    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+        if (this.targetThroughput < 0) {
+            // No throttling in this case
+            return false;
+        }
+
+        float elapsedMs = (sendStartMs - startMs) / 1000.f;
+        return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
+    }
+
+    /**
+     * Occasionally blocks for small amounts of time to achieve targetThroughput.
+     * 
+     * Note that if targetThroughput is 0, this will block extremely aggressively.
+     */
+    public void throttle() {
+        if (targetThroughput == 0) {
+            try {
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            return;
+        }
+        
+        // throttle throughput by sleeping, on average,
+        // (1 / this.throughput) seconds between "things sent"
+        sleepDeficitNs += sleepTimeNs;
+
+        // If enough sleep deficit has accumulated, sleep a little
+        if (sleepDeficitNs >= MIN_SLEEP_NS) {
+            long sleepMs = sleepDeficitNs / 1000000;
+            long sleepNs = sleepDeficitNs - sleepMs * 1000000;
+
+            long sleepStartNs = System.nanoTime();
+            try {
+                Thread.sleep(sleepMs, (int) sleepNs);
+                sleepDeficitNs = 0;
+            } catch (InterruptedException e) {
+                // If sleep is cut short, reduce deficit by the amount of
+                // time we actually spent sleeping
+                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+                if (sleepElapsedNs <= sleepDeficitNs) {
+                    sleepDeficitNs -= sleepElapsedNs;
+                }
+            }
+        }
+    }
+}
+    
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
new file mode 100644
index 0000000..b04876f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.tools;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+/**
+ * Primarily intended for use with system testing, this producer prints metadata
+ * in the form of JSON to stdout on each "send" request. For example, this helps
+ * with end-to-end correctness tests by making externally visible which messages have been
+ * acked and which have not.
+ *
+ * When used as a command-line tool, it produces increasing integers. It will produce a 
+ * fixed number of messages unless the default max-messages -1 is used, in which case
+ * it produces indefinitely.
+ *  
+ * If logging is left enabled, log output on stdout can be easily ignored by checking
+ * whether a given line is valid JSON.
+ */
+public class VerifiableProducer {
+    
+    String topic;
+    private Producer<String, String> producer;
+    // If maxMessages < 0, produce until the process is killed externally
+    private long maxMessages = -1;
+    
+    // Number of messages for which acks were received
+    private long numAcked = 0;
+    
+    // Number of send attempts
+    private long numSent = 0;
+    
+    // Throttle message throughput if this is set >= 0
+    private long throughput;
+    
+    // Hook to trigger producing thread to stop sending messages
+    private boolean stopProducing = false;
+
+    public VerifiableProducer(
+            Properties producerProps, String topic, int throughput, int maxMessages) {
+
+        this.topic = topic;
+        this.throughput = throughput;
+        this.maxMessages = maxMessages;
+        this.producer = new KafkaProducer<String, String>(producerProps);
+    }
+
+    /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("verifiable-producer")
+                .defaultHelp(true)
+                .description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
+
+        parser.addArgument("--topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("TOPIC")
+                .help("Produce messages to this topic.");
+
+        parser.addArgument("--broker-list")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("brokerList")
+                .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+        
+        parser.addArgument("--max-messages")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("MAX-MESSAGES")
+                .dest("maxMessages")
+                .help("Produce this many messages. If -1, produce messages until the process is killed externally.");
+
+        parser.addArgument("--throughput")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("THROUGHPUT")
+                .help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
+
+        parser.addArgument("--acks")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .choices(0, 1, -1)
+                .metavar("ACKS")
+                .help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
+
+        return parser;
+    }
+  
+    /** Construct a VerifiableProducer object from command-line arguments. */
+    public static VerifiableProducer createFromArgs(String[] args) {
+        ArgumentParser parser = argParser();
+        VerifiableProducer producer = null;
+        
+        try {
+            Namespace res;
+            res = parser.parseArgs(args);
+
+            int maxMessages = res.getInt("maxMessages");
+            String topic = res.getString("topic");
+            int throughput = res.getInt("throughput");
+
+            Properties producerProps = new Properties();
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                              "org.apache.kafka.common.serialization.StringSerializer");
+            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                              "org.apache.kafka.common.serialization.StringSerializer");
+            producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
+            // No producer retries
+            producerProps.put("retries", "0");
+
+            producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
+            }
+        }
+        
+        return producer;
+    }
+  
+    /** Produce a message with given key and value. */
+    public void send(String key, String value) {
+        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
+        numSent++;
+        try {
+            producer.send(record, new PrintInfoCallback(key, value));
+        } catch (Exception e) {
+
+            synchronized (System.out) {
+                System.out.println(errorString(e, key, value, System.currentTimeMillis()));
+            }
+        }
+    }
+  
+    /** Close the producer to flush any remaining messages. */
+    public void close() {
+        producer.close();
+    }
+  
+    /**
+     * Return JSON string encapsulating basic information about the exception, as well
+     * as the key and value which triggered the exception.
+     */
+    String errorString(Exception e, String key, String value, Long nowMs) {
+        assert e != null : "Expected non-null exception.";
+
+        Map<String, Object> errorData = new HashMap<>();
+        errorData.put("class", this.getClass().toString());
+        errorData.put("name", "producer_send_error");
+
+        errorData.put("time_ms", nowMs);
+        errorData.put("exception", e.getClass().toString());
+        errorData.put("message", e.getMessage());
+        errorData.put("topic", this.topic);
+        errorData.put("key", key);
+        errorData.put("value", value);
+        
+        return toJsonString(errorData);
+    }
+  
+    String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
+        assert recordMetadata != null : "Expected non-null recordMetadata object.";
+
+        Map<String, Object> successData = new HashMap<>();
+        successData.put("class", this.getClass().toString());
+        successData.put("name", "producer_send_success");
+
+        successData.put("time_ms", nowMs);
+        successData.put("topic", this.topic);
+        successData.put("partition", recordMetadata.partition());
+        successData.put("offset", recordMetadata.offset());
+        successData.put("key", key);
+        successData.put("value", value);
+        
+        return toJsonString(successData);
+    }
+    
+    private String toJsonString(Map<String, Object> data) {
+        String json;
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            json = mapper.writeValueAsString(data);
+        } catch (JsonProcessingException e) {
+            json = "Bad data can't be written as json: " + e.getMessage();
+        }
+        return json;
+    }
+  
+    /** Callback which prints errors to stdout when the producer fails to send. */
+    private class PrintInfoCallback implements Callback {
+        
+        private String key;
+        private String value;
+    
+        PrintInfoCallback(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+    
+        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+            synchronized (System.out) {
+                if (e == null) {
+                    VerifiableProducer.this.numAcked++;
+                    System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis()));
+                } else {
+                    System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
+                }
+            }
+        }
+    }
+  
+    public static void main(String[] args) throws IOException {
+        
+        final VerifiableProducer producer = createFromArgs(args);
+        final long startMs = System.currentTimeMillis();
+        boolean infinite = producer.maxMessages < 0;
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // Trigger main thread to stop producing messages
+                producer.stopProducing = true;
+                
+                // Flush any remaining messages
+                producer.close();
+
+                // Print a summary
+                long stopMs = System.currentTimeMillis();
+                double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
+                
+                Map<String, Object> data = new HashMap<>();
+                data.put("class", producer.getClass().toString());
+                data.put("name", "tool_data");
+                data.put("sent", producer.numSent);
+                data.put("acked", producer.numAcked);
+                data.put("target_throughput", producer.throughput);
+                data.put("avg_throughput", avgThroughput);
+                
+                System.out.println(producer.toJsonString(data));
+            }
+        });
+
+        ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
+        for (int i = 0; i < producer.maxMessages || infinite; i++) {
+            if (producer.stopProducing) {
+                break;
+            }
+            long sendStartMs = System.currentTimeMillis();
+            producer.send(null, String.format("%d", i));
+            
+            if (throttler.shouldThrottle(i, sendStartMs)) {
+                throttler.throttle();
+            }
+        }
+    }
+        
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/vagrant/aws/aws-access-keys-commands
----------------------------------------------------------------------
diff --git a/vagrant/aws/aws-access-keys-commands b/vagrant/aws/aws-access-keys-commands
new file mode 100644
index 0000000..9c923f8
--- /dev/null
+++ b/vagrant/aws/aws-access-keys-commands
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+if [ -z "$AWS_IAM" ];then
+    echo "Warning: AWS_IAM is not set"
+fi
+
+export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep AccessKeyId | awk -F\" '{ print $4 }'`
+export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep SecretAccessKey | awk -F\" '{ print $4 }'`
+export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep Token | awk -F\" '{ print $4 }'`
+
+if [ -z "$AWS_ACCESS_KEY" ]; then
+    echo "Failed to populate environment variables AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_SESSION_TOKEN."
+    echo "AWS_IAM is currently $AWS_IAM. Double-check that this is correct. If not set, add this command to your .bashrc file:"
+    echo "export AWS_IAM=<my_aws_iam>  # put this into your ~/.bashrc"
+fi

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/vagrant/aws/aws-example-Vagrantfile.local
----------------------------------------------------------------------
diff --git a/vagrant/aws/aws-example-Vagrantfile.local b/vagrant/aws/aws-example-Vagrantfile.local
new file mode 100644
index 0000000..c3b075b
--- /dev/null
+++ b/vagrant/aws/aws-example-Vagrantfile.local
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# Use this template Vagrantfile.local for running system tests on aws 
+# To use it, move it to the base kafka directory and rename
+# it to Vagrantfile.local, and adjust variables as needed.
+ec3_instance_type = "m3.medium"
+num_zookeepers = 0
+num_brokers = 0
+num_workers = 9
+ec2_keypair_name = kafkatest
+ec2_keypair_file = ../kafkatest.pem
+ec2_security_groups = ['kafkatest']
+ec2_region = 'us-west-2'
+ec2_ami = "ami-29ebb519"

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/vagrant/aws/aws-init.sh
----------------------------------------------------------------------
diff --git a/vagrant/aws/aws-init.sh b/vagrant/aws/aws-init.sh
new file mode 100755
index 0000000..6151928
--- /dev/null
+++ b/vagrant/aws/aws-init.sh
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+#!/bin/bash
+
+# This script can be used to set up a driver machine on aws from which you will run tests
+# or bring up your mini Kafka cluster.
+
+# Install dependencies
+sudo apt-get install -y maven openjdk-6-jdk build-essential \
+            ruby-dev zlib1g-dev realpath python-setuptools
+
+base_dir=`dirname $0`/../..
+
+if [ -z `which vagrant` ]; then
+    echo "Installing vagrant..."
+    wget https://dl.bintray.com/mitchellh/vagrant/vagrant_1.7.2_x86_64.deb
+    sudo dpkg -i vagrant_1.7.2_x86_64.deb
+    rm -f vagrant_1.7.2_x86_64.deb
+fi
+
+# Install necessary vagrant plugins
+# Note: Do NOT install vagrant-cachier since it doesn't work on AWS and only
+# adds log noise
+vagrant_plugins="vagrant-aws vagrant-hostmanager"
+existing=`vagrant plugin list`
+for plugin in $vagrant_plugins; do
+    echo $existing | grep $plugin > /dev/null
+    if [ $? != 0 ]; then
+        vagrant plugin install $plugin
+    fi
+done
+
+# Create Vagrantfile.local as a convenience
+if [ ! -e "$base_dir/Vagrantfile.local" ]; then
+    cp $base_dir/aws/aws-example-Vagrantfile.local $base_dir/Vagrantfile.local
+fi
+
+gradle="gradle-2.2.1"
+if [ -z `which gradle` ] && [ ! -d $base_dir/$gradle ]; then
+    if [ ! -e $gradle-bin.zip ]; then
+        wget https://services.gradle.org/distributions/$gradle-bin.zip
+    fi
+    unzip $gradle-bin.zip
+    rm -rf $gradle-bin.zip
+    mv $gradle $base_dir/$gradle
+fi
+
+# Ensure aws access keys are in the environment when we use a EC2 driver machine
+LOCAL_HOSTNAME=$(hostname -d)
+if [[ ${LOCAL_HOSTNAME} =~ .*\.compute\.internal ]]; then
+  grep "AWS ACCESS KEYS" ~/.bashrc > /dev/null
+  if [ $? != 0 ]; then
+    echo "# --- AWS ACCESS KEYS ---" >> ~/.bashrc
+    echo ". `realpath $base_dir/aws/aws-access-keys-commands`" >> ~/.bashrc
+    echo "# -----------------------" >> ~/.bashrc
+    source ~/.bashrc
+  fi
+fi
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 6f28dfe..133f10a 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -41,3 +41,12 @@ chmod a+rw /opt
 if [ ! -e /opt/kafka ]; then
     ln -s /vagrant /opt/kafka
 fi
+
+# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
+# VMs, we can just create it if it doesn't exist and use it like we'd use
+# /tmp. Eventually, we'd like to also support more directories, e.g. when EC2
+# instances have multiple local disks.
+if [ ! -e /mnt ]; then
+    mkdir /mnt
+fi
+chmod a+rwx /mnt

http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/vagrant/system-test-Vagrantfile.local
----------------------------------------------------------------------
diff --git a/vagrant/system-test-Vagrantfile.local b/vagrant/system-test-Vagrantfile.local
new file mode 100644
index 0000000..7f280a4
--- /dev/null
+++ b/vagrant/system-test-Vagrantfile.local
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# Use this example Vagrantfile.local for running system tests
+# To use it, move it to the base kafka directory and rename
+# it to Vagrantfile.local
+num_zookeepers = 0
+num_brokers = 0
+num_workers = 9


Mime
View raw message