kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3214: Added system tests for compressed topics
Date Fri, 26 Feb 2016 21:40:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4542b027c -> 4e0ae79d5


KAFKA-3214: Added system tests for compressed topics

Added CompressionTest that tests 4 producers, each using a different compression type and
one not using compression.

Enabled VerifiableProducer to run producers with different compression types (passed in the
constructor). This includes enabling each producer to output unique values, so that the verification
process in ProduceConsumeValidateTest is correct (counts acks from all producers).

Also a fix for console consumer to raise an exception if it sees the incorrect consumer output
(before we swallowed an exception, so was hard to debug the issue).

Author: Anna Povzner <anna@confluent.io>

Reviewers: Geoff Anderson, Jason Gustafson

Closes #958 from apovzner/kafka-3214


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e0ae79d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e0ae79d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e0ae79d

Branch: refs/heads/trunk
Commit: 4e0ae79d5c9aec04c014aca55cd4823f89b933c8
Parents: 4542b02
Author: Anna Povzner <anna@confluent.io>
Authored: Fri Feb 26 13:40:39 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Feb 26 13:40:39 2016 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py    | 10 ---
 tests/kafkatest/services/verifiable_producer.py | 56 ++++++++++---
 tests/kafkatest/tests/compatibility_test.py     |  3 +-
 tests/kafkatest/tests/compression_test.py       | 85 ++++++++++++++++++++
 tests/kafkatest/tests/mirror_maker_test.py      |  3 +-
 .../kafkatest/tests/reassign_partitions_test.py |  3 +-
 tests/kafkatest/tests/replication_test.py       |  3 +-
 .../tests/security_rolling_upgrade_test.py      |  3 +-
 tests/kafkatest/tests/upgrade_test.py           |  3 +-
 .../tests/zookeeper_security_upgrade_test.py    |  5 +-
 tests/kafkatest/utils/__init__.py               |  2 +-
 tests/kafkatest/utils/util.py                   | 31 +++++++
 .../apache/kafka/tools/VerifiableProducer.java  | 29 ++++++-
 13 files changed, 204 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 2124fc4..a3bc2fd 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -26,16 +26,6 @@ import os
 import subprocess
 
 
-def is_int(msg):
-    """Default method used to check whether text pulled from console consumer is a message.
-
-    return int or None
-    """
-    try:
-        return int(msg)
-    except ValueError:
-        return None
-
 """
 0.8.2.1 ConsoleConsumer options
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 945f54b..f2ea421 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -17,6 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
 
 from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
 from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
+from kafkatest.utils import is_int, is_int_with_prefix
 
 import json
 import os
@@ -24,7 +25,6 @@ import signal
 import subprocess
 import time
 
-
 class VerifiableProducer(BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_producer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
@@ -42,40 +42,65 @@ class VerifiableProducer(BackgroundThreadService):
             "collect_default": True}
         }
 
-    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
version=TRUNK):
+    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
+                 message_validator=is_int, compression_types=None, version=TRUNK):
+        """
+        :param max_messages is a number of messages to be produced per producer
+        :param message_validator checks for an expected format of messages produced. There
are
+        currently two:
+               * is_int is an integer format; this is default and expected to be used if
+               num_nodes = 1
+               * is_int_with_prefix recommended if num_nodes > 1, because otherwise each
producer
+               will produce exactly same messages, and validation may miss missing messages.
+        :param compression_types: If None, all producers will not use compression; or a list
of one or
+        more compression types (including "none"). Each producer will pick a compression
type
+        from the list in round-robin fashion. Example: compression_types = ["none", "snappy"]
and
+        num_nodes = 3, then producer 1 and 2 will not use compression, and producer 3 will
use
+        compression type = snappy. If in this example, num_nodes is 1, then first (and only)
+        producer will not use compression.
+        """
         super(VerifiableProducer, self).__init__(context, num_nodes)
 
         self.kafka = kafka
         self.topic = topic
         self.max_messages = max_messages
         self.throughput = throughput
+        self.message_validator = message_validator
+        self.compression_types = compression_types
 
         for node in self.nodes:
             node.version = version
         self.acked_values = []
         self.not_acked_values = []
+        self.produced_count = {}
         self.prop_file = ""
 
-
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
         self.security_config = self.kafka.security_config.client_config(self.prop_file)
-        self.prop_file += str(self.security_config)
+        producer_prop_file = str(self.security_config)
         log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
         node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
 
         # Create and upload config file
+        if self.compression_types is not None:
+            compression_index = (idx - 1) % len(self.compression_types)
+            self.logger.info("VerifiableProducer (index = %d) will use compression type =
%s", idx,
+                             self.compression_types[compression_index])
+            producer_prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
+
         self.logger.info("verifiable_producer.properties:")
-        self.logger.info(self.prop_file)
-        node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file)
+        self.logger.info(producer_prop_file)
+        node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
         self.security_config.setup_node(node)
 
-        cmd = self.start_cmd(node)
+        cmd = self.start_cmd(node, idx)
         self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
 
 
+        self.produced_count[idx] = 0
         last_produced_time = time.time()
         prev_msg = None
         for line in node.account.ssh_capture(cmd):
@@ -87,10 +112,12 @@ class VerifiableProducer(BackgroundThreadService):
                 with self.lock:
                     if data["name"] == "producer_send_error":
                         data["node"] = idx
-                        self.not_acked_values.append(int(data["value"]))
+                        self.not_acked_values.append(self.message_validator(data["value"]))
+                        self.produced_count[idx] += 1
 
                     elif data["name"] == "producer_send_success":
-                        self.acked_values.append(int(data["value"]))
+                        self.acked_values.append(self.message_validator(data["value"]))
+                        self.produced_count[idx] += 1
 
                         # Log information if there is a large gap between successively acknowledged
messages
                         t = time.time()
@@ -103,7 +130,7 @@ class VerifiableProducer(BackgroundThreadService):
                         last_produced_time = t
                         prev_msg = data
 
-    def start_cmd(self, node):
+    def start_cmd(self, node, idx):
 
         cmd = ""
         if node.version <= LATEST_0_8_2:
@@ -122,6 +149,8 @@ class VerifiableProducer(BackgroundThreadService):
             cmd += " --max-messages %s" % str(self.max_messages)
         if self.throughput > 0:
             cmd += " --throughput %s" % str(self.throughput)
+        if self.message_validator == is_int_with_prefix:
+            cmd += " --value-prefix %s" % str(idx)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
@@ -166,6 +195,13 @@ class VerifiableProducer(BackgroundThreadService):
         with self.lock:
             return len(self.not_acked_values)
 
+    def each_produced_at_least(self, count):
+        with self.lock:
+            for idx in range(1, self.num_nodes):
+                if self.produced_count.get(idx) is None or self.produced_count[idx] <
count:
+                    return False
+            return True
+
     def stop_node(self, node):
         self.kill_node(node, clean_shutdown=False, allow_fail=False)
         if self.worker_threads is None:

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py
index 47e2752..bc00b29 100644
--- a/tests/kafkatest/tests/compatibility_test.py
+++ b/tests/kafkatest/tests/compatibility_test.py
@@ -18,7 +18,8 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
 
 
 class ClientCompatibilityTest(Test):

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compression_test.py b/tests/kafkatest/tests/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+    """
+    These tests validate produce / consume for compressed topics.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(CompressionTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic:
{
+                                                                    "partitions": 10,
+                                                                    "replication-factor":
1}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 4
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+    def test_compressed_topic(self, compression_types, new_consumer):
+        """Test produce => consume => validate for compressed topics
+        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+        compression_types parameter gives a list of compression types (or no compression
if
+        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use
a
+        compression type from the list based on producer's index in the group.
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix,
+                                           compression_types=compression_types)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic,
+                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker_test.py
index 0244f81..afb1972 100644
--- a/tests/kafkatest/tests/mirror_maker_test.py
+++ b/tests/kafkatest/tests/mirror_maker_test.py
@@ -18,11 +18,12 @@ from ducktape.mark import parametrize, matrix, ignore
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.mirror_maker import MirrorMaker
 from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
 
 import time
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/reassign_partitions_test.py b/tests/kafkatest/tests/reassign_partitions_test.py
index 0ac12f5..fc0b9d6 100644
--- a/tests/kafkatest/tests/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/reassign_partitions_test.py
@@ -19,8 +19,9 @@ from ducktape.utils.util import wait_until
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
 import random
 
 class ReassignPartitionsTest(ProduceConsumeValidateTest):

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index 4909a9a..7b360ab 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -20,8 +20,9 @@ from ducktape.mark import matrix
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
 
 import signal
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
index f19d6b8..fdbedca 100644
--- a/tests/kafkatest/tests/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -18,7 +18,8 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from ducktape.mark import matrix
 from kafkatest.services.security.kafka_acls import ACLs

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index ea6f7ac..20be4f2 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -17,9 +17,10 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import config_property
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
 
 
 class TestUpgrade(ProduceConsumeValidateTest):

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
index 3bfc478..7f80deb 100644
--- a/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
@@ -18,10 +18,11 @@ from ducktape.mark import matrix
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.services.security.kafka_acls import ACLs
+from kafkatest.utils import is_int
 import time
 
 class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
@@ -113,4 +114,4 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
         self.create_producer_and_consumer()
 
         #Run upgrade
-        self.run_produce_consume_validate(self.run_zk_migration)
\ No newline at end of file
+        self.run_produce_consume_validate(self.run_zk_migration)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index 46c71f0..ec224d8 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -14,4 +14,4 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
-from util import kafkatest_version, is_version
\ No newline at end of file
+from util import kafkatest_version, is_version, is_int, is_int_with_prefix

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tests/kafkatest/utils/util.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index 2b1e49c..0b10dbf 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -40,3 +40,34 @@ def is_version(node, version_list, proc_grep_string="kafka"):
     versions = _kafka_jar_versions(lines[0])
     return versions == {str(v) for v in version_list}
 
+
+def is_int(msg):
+    """Method used to check whether the given message is an integer
+
+    return int or raises an exception if message is not an integer
+    """
+    try:
+        return int(msg)
+    except ValueError:
+        raise Exception("Unexpected message format (expected an integer). Message: %s" %
(msg))
+
+
+def is_int_with_prefix(msg):
+    """
+    Method used check whether the given message is of format 'integer_prefix'.'integer_value'
+
+    :param msg: message to validate
+    :return: msg or raises an exception is a message is of wrong format
+    """
+    try:
+        parts = msg.split(".")
+        if len(parts) != 2:
+            raise Exception("Unexpected message format. Message should be of format: integer
"
+                            "prefix dot integer value. Message: %s" % (msg))
+        int(parts[0])
+        int(parts[1])
+        return msg
+    except ValueError:
+        raise Exception("Unexpected message format. Message should be of format: integer
"
+                        "prefix dot integer value, but one of the two parts (before or after
dot) "
+                        "are not integers. Message: %s" % (msg))

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e0ae79d/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 0cd90c0..9b10a9f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -74,13 +74,18 @@ public class VerifiableProducer {
     // Hook to trigger producing thread to stop sending messages
     private boolean stopProducing = false;
 
+    // Prefix (plus a dot separator) added to every value produced by verifiable producer
+    // if null, then values are produced without a prefix
+    private Integer valuePrefix;
+
     public VerifiableProducer(
-            Properties producerProps, String topic, int throughput, int maxMessages) {
+            Properties producerProps, String topic, int throughput, int maxMessages, Integer
valuePrefix) {
 
         this.topic = topic;
         this.throughput = throughput;
         this.maxMessages = maxMessages;
         this.producer = new KafkaProducer<String, String>(producerProps);
+        this.valuePrefix = valuePrefix;
     }
 
     /** Get the command-line argument parser. */
@@ -138,6 +143,14 @@ public class VerifiableProducer {
                 .metavar("CONFIG_FILE")
                 .help("Producer config properties file.");
 
+        parser.addArgument("--value-prefix")
+            .action(store())
+            .required(false)
+            .type(Integer.class)
+            .metavar("VALUE-PREFIX")
+            .dest("valuePrefix")
+            .help("If specified, each produced value will have this prefix with a dot separator");
+
         return parser;
     }
     
@@ -176,6 +189,7 @@ public class VerifiableProducer {
             String topic = res.getString("topic");
             int throughput = res.getInt("throughput");
             String configFile = res.getString("producer.config");
+            Integer valuePrefix = res.getInt("valuePrefix");
 
             Properties producerProps = new Properties();
             producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
@@ -194,7 +208,7 @@ public class VerifiableProducer {
                 }
             }
 
-            producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
+            producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages,
valuePrefix);
         } catch (ArgumentParserException e) {
             if (args.length == 0) {
                 parser.printHelp();
@@ -222,6 +236,14 @@ public class VerifiableProducer {
         }
     }
 
+    /** Returns a string to publish: ether 'valuePrefix'.'val' or 'val' **/
+    public String getValue(long val) {
+        if (this.valuePrefix != null) {
+            return String.format("%d.%d", this.valuePrefix.intValue(), val);
+        }
+        return String.format("%d", val);
+    }
+
     /** Close the producer to flush any remaining messages. */
     public void close() {
         producer.close();
@@ -337,7 +359,8 @@ public class VerifiableProducer {
                 break;
             }
             long sendStartMs = System.currentTimeMillis();
-            producer.send(null, String.format("%d", i));
+
+            producer.send(null, producer.getValue(i));
 
             if (throttler.shouldThrottle(i, sendStartMs)) {
                 throttler.throttle();


Mime
View raw message