kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
Date Tue, 27 Nov 2018 20:50:17 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9368743  KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
9368743 is described below

commit 9368743b8fd2b42a41b44860ea0f3588bb273cc8
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Tue Nov 27 20:49:53 2018 +0000

    KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
    
    KAFKA-7597: Add configurable transaction support to ProduceBenchWorker.  In order to get
support for serializing Optional<> types to JSON, add a new library: jackson-datatype-jdk8.
Once Jackson 3 comes out, this library will not be needed.
    
    Reviewers: Colin McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
---
 build.gradle                                       |   5 +
 gradle/dependencies.gradle                         |   1 +
 .../bin/trogdor-run-transactional-produce-bench.sh |  51 ++++++++++
 .../services/trogdor/produce_bench_workload.py     |   4 +-
 tests/kafkatest/tests/core/produce_bench_test.py   |  29 +++++-
 .../org/apache/kafka/trogdor/common/JsonUtil.java  |   2 +
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |  37 +++++++
 .../kafka/trogdor/workload/ProduceBenchWorker.java | 112 +++++++++++++++++----
 .../trogdor/workload/TransactionGenerator.java     |  43 ++++++++
 .../workload/UniformTransactionsGenerator.java     |  57 +++++++++++
 .../trogdor/common/JsonSerializationTest.java      |   3 +-
 11 files changed, 318 insertions(+), 26 deletions(-)

diff --git a/build.gradle b/build.gradle
index 4d514df..5ce648a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -565,6 +565,7 @@ project(':core') {
   dependencies {
     compile project(':clients')
     compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
     compile libs.joptSimple
     compile libs.metrics
     compile libs.scalaLibrary
@@ -830,6 +831,7 @@ project(':clients') {
     compile libs.snappy
     compile libs.slf4jApi
     compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
+    compileOnly libs.jacksonJDK8Datatypes
 
     jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope dependency.
 
@@ -839,6 +841,7 @@ project(':clients') {
 
     testRuntime libs.slf4jlog4j
     testRuntime libs.jacksonDatabind
+    testRuntime libs.jacksonJDK8Datatypes
   }
 
   task determineCommitId {
@@ -918,6 +921,7 @@ project(':tools') {
     compile project(':log4j-appender')
     compile libs.argparse4j
     compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
     compile libs.slf4jApi
 
     compile libs.jacksonJaxrsJsonProvider
@@ -1347,6 +1351,7 @@ project(':connect:json') {
   dependencies {
     compile project(':connect:api')
     compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
     compile libs.slf4jApi
 
     testCompile libs.easymock
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 7dd3604..59f56fc 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -103,6 +103,7 @@ libs += [
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
+  jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
   jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",
   jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs",
diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh b/tests/bin/trogdor-run-transactional-produce-bench.sh
new file mode 100755
index 0000000..fd5ff0a
--- /dev/null
+++ b/tests/bin/trogdor-run-transactional-produce-bench.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+COORDINATOR_ENDPOINT="localhost:8889"
+TASK_ID="produce_bench_$RANDOM"
+TASK_SPEC=$(
+cat <<EOF
+{
+    "id": "$TASK_ID",
+    "spec": {
+        "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+        "durationMs": 10000000,
+        "producerNode": "node0",
+        "bootstrapServers": "localhost:9092",
+        "targetMessagesPerSec": 100,
+        "maxMessages": 500,
+        "transactionGenerator" : {
+          "type" : "uniform",
+          "messagesPerTransaction" : 50
+        },
+        "activeTopics": {
+            "foo[1-3]": {
+                "numPartitions": 3,
+                "replicationFactor": 1
+            }
+        },
+        "inactiveTopics": {
+            "foo[4-5]": {
+                "numPartitions": 3,
+                "replicationFactor": 1
+            }
+        }
+    }
+}
+EOF
+)
+./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
+echo "\$TASK_ID = $TASK_ID"
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index cf6a962..9afc814 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -21,7 +21,8 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
 class ProduceBenchWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
                  target_messages_per_sec, max_messages, producer_conf, admin_client_conf,
-                 common_client_conf, inactive_topics, active_topics):
+                 common_client_conf, inactive_topics, active_topics,
+                 transaction_generator=None):
         super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
         self.message["producerNode"] = producer_node
@@ -29,6 +30,7 @@ class ProduceBenchWorkloadSpec(TaskSpec):
         self.message["targetMessagesPerSec"] = target_messages_per_sec
         self.message["maxMessages"] = max_messages
         self.message["producerConf"] = producer_conf
+        self.message["transactionGenerator"] = transaction_generator
         self.message["adminClientConf"] = admin_client_conf
         self.message["commonClientConf"] = common_client_conf
         self.message["inactiveTopics"] = inactive_topics
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
index 125ee94..a316520 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -31,6 +31,8 @@ class ProduceBenchTest(Test):
         self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
         self.trogdor = TrogdorService(context=self.test_context,
                                       client_services=[self.kafka, self.workload_service])
+        self.active_topics = {"produce_bench_topic[0-1]": {"numPartitions": 1, "replicationFactor":
3}}
+        self.inactive_topics = {"produce_bench_topic[2-9]": {"numPartitions": 1, "replicationFactor":
3}}
 
     def setUp(self):
         self.trogdor.start()
@@ -43,8 +45,6 @@ class ProduceBenchTest(Test):
         self.zk.stop()
 
     def test_produce_bench(self):
-        active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, "replicationFactor":3}}
-        inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, "replicationFactor":3}}
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         self.workload_service.bootstrap_servers,
@@ -53,8 +53,29 @@ class ProduceBenchTest(Test):
                                         producer_conf={},
                                         admin_client_conf={},
                                         common_client_conf={},
-                                        inactive_topics=inactive_topics,
-                                        active_topics=active_topics)
+                                        inactive_topics=self.inactive_topics,
+                                        active_topics=self.active_topics)
+        workload1 = self.trogdor.create_task("workload1", spec)
+        workload1.wait_for_done(timeout_sec=360)
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
+
+    def test_produce_bench_transactions(self):
+        spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                        self.workload_service.producer_node,
+                                        self.workload_service.bootstrap_servers,
+                                        target_messages_per_sec=1000,
+                                        max_messages=100000,
+                                        producer_conf={},
+                                        admin_client_conf={},
+                                        common_client_conf={},
+                                        inactive_topics=self.inactive_topics,
+                                        active_topics=self.active_topics,
+                                        transaction_generator={
+                                            # 10 transactions with 10k messages
+                                            "type": "uniform",
+                                            "messagesPerTransaction": "10000"
+                                        })
         workload1 = self.trogdor.create_task("workload1", spec)
         workload1.wait_for_done(timeout_sec=360)
         tasks = self.trogdor.tasks()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
index 70193c3..ad90ffc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 
 /**
  * Utilities for working with JSON.
@@ -33,6 +34,7 @@ public class JsonUtil {
         JSON_SERDE = new ObjectMapper();
         JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
+        JSON_SERDE.registerModule(new Jdk8Module());
         JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 30878bf..c0bbd7e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -26,10 +26,39 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
  * The specification for a benchmark that produces messages to a set of topics.
+ *
+ * To configure a transactional producer, a #{@link TransactionGenerator} must be passed
in.
+ * Said generator works in lockstep with the producer by instructing it what action to take
next in regards to a transaction.
+ *
+ * An example JSON representation which will result in a producer that creates three topics
(foo1, foo2, foo3)
+ * with three partitions each and produces to them:
+ * #{@code
+ *   {
+ *      "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+ *      "durationMs": 10000000,
+ *      "producerNode": "node0",
+ *      "bootstrapServers": "localhost:9092",
+ *      "targetMessagesPerSec": 10,
+ *      "maxMessages": 100,
+ *      "activeTopics": {
+ *        "foo[1-3]": {
+ *          "numPartitions": 3,
+ *          "replicationFactor": 1
+ *        }
+ *      },
+ *      "inactiveTopics": {
+ *        "foo[4-5]": {
+ *          "numPartitions": 3,
+ *          "replicationFactor": 1
+ *        }
+ *      }
+ *   }
+ * }
  */
 public class ProduceBenchSpec extends TaskSpec {
     private final String producerNode;
@@ -38,6 +67,7 @@ public class ProduceBenchSpec extends TaskSpec {
     private final int maxMessages;
     private final PayloadGenerator keyGenerator;
     private final PayloadGenerator valueGenerator;
+    private final Optional<TransactionGenerator> transactionGenerator;
     private final Map<String, String> producerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
@@ -53,6 +83,7 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("maxMessages") int maxMessages,
                          @JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
                          @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
+                         @JsonProperty("transactionGenerator") Optional<TransactionGenerator>
txGenerator,
                          @JsonProperty("producerConf") Map<String, String> producerConf,
                          @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
                          @JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@@ -67,6 +98,7 @@ public class ProduceBenchSpec extends TaskSpec {
             new SequentialPayloadGenerator(4, 0) : keyGenerator;
         this.valueGenerator = valueGenerator == null ?
             new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator;
+        this.transactionGenerator = txGenerator == null ? Optional.empty() : txGenerator;
         this.producerConf = configOrEmptyMap(producerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
@@ -107,6 +139,11 @@ public class ProduceBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
+    public Optional<TransactionGenerator> transactionGenerator() {
+        return transactionGenerator;
+    }
+
+    @JsonProperty
     public Map<String, String> producerConf() {
         return producerConf;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index dc749eb..abf5976 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
 import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.apache.kafka.trogdor.workload.TransactionGenerator.TransactionAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,13 +44,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class ProduceBenchWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
@@ -179,18 +183,33 @@ public class ProduceBenchWorker implements TaskWorker {
 
         private final PayloadIterator values;
 
+        private final Optional<TransactionGenerator> transactionGenerator;
+
         private final Throttle throttle;
 
+        private Iterator<TopicPartition> partitionsIterator;
+        private Future<RecordMetadata> sendFuture;
+        private AtomicLong transactionsCommitted;
+        private boolean enableTransactions;
+
         SendRecords(HashSet<TopicPartition> activePartitions) {
             this.activePartitions = activePartitions;
+            this.partitionsIterator = activePartitions.iterator();
             this.histogram = new Histogram(5000);
+
+            this.transactionGenerator = spec.transactionGenerator();
+            this.enableTransactions = this.transactionGenerator.isPresent();
+            this.transactionsCommitted = new AtomicLong();
+
             int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
-                new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
+                new StatusUpdater(histogram, transactionsCommitted), 30, 30, TimeUnit.SECONDS);
+
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
-            // add common client configs to producer properties, and then user-specified
producer
-            // configs
+            if (enableTransactions)
+                props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "produce-bench-transaction-id-"
+ UUID.randomUUID());
+            // add common client configs to producer properties, and then user-specified
producer configs
             WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.producerConf());
             this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new
ByteArraySerializer());
             this.keys = new PayloadIterator(spec.keyGenerator());
@@ -202,23 +221,29 @@ public class ProduceBenchWorker implements TaskWorker {
         public Void call() throws Exception {
             long startTimeMs = Time.SYSTEM.milliseconds();
             try {
-                Future<RecordMetadata> future = null;
                 try {
-                    Iterator<TopicPartition> iter = activePartitions.iterator();
-                    for (int m = 0; m < spec.maxMessages(); m++) {
-                        if (!iter.hasNext()) {
-                            iter = activePartitions.iterator();
+                    if (enableTransactions)
+                        producer.initTransactions();
+
+                    int sentMessages = 0;
+                    while (sentMessages < spec.maxMessages()) {
+                        if (enableTransactions) {
+                            boolean tookAction = takeTransactionAction();
+                            if (tookAction)
+                                continue;
                         }
-                        TopicPartition partition = iter.next();
-                        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                            partition.topic(), partition.partition(), keys.next(), values.next());
-                        future = producer.send(record,
-                            new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
-                        throttle.increment();
+                        sendMessage();
+                        sentMessages++;
                     }
+                    if (enableTransactions)
+                        takeTransactionAction(); // give the transactionGenerator a chance
to commit if configured evenly
+                } catch (Exception e) {
+                    if (enableTransactions)
+                        producer.abortTransaction();
+                    throw e;
                 } finally {
-                    if (future != null) {
-                        future.get();
+                    if (sendFuture != null) {
+                        sendFuture.get();
                     }
                     producer.close();
                 }
@@ -226,7 +251,7 @@ public class ProduceBenchWorker implements TaskWorker {
                 WorkerUtils.abort(log, "SendRecords", e, doneFuture);
             } finally {
                 statusUpdaterFuture.cancel(false);
-                StatusData statusData = new StatusUpdater(histogram).update();
+                StatusData statusData = new StatusUpdater(histogram, transactionsCommitted).update();
                 long curTimeMs = Time.SYSTEM.milliseconds();
                 log.info("Sent {} total record(s) in {} ms.  status: {}",
                     histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData);
@@ -235,6 +260,42 @@ public class ProduceBenchWorker implements TaskWorker {
             return null;
         }
 
+        private boolean takeTransactionAction() {
+            boolean tookAction = true;
+            TransactionAction nextAction = transactionGenerator.get().nextAction();
+            switch (nextAction) {
+                case BEGIN_TRANSACTION:
+                    log.debug("Beginning transaction.");
+                    producer.beginTransaction();
+                    break;
+                case COMMIT_TRANSACTION:
+                    log.debug("Committing transaction.");
+                    producer.commitTransaction();
+                    transactionsCommitted.getAndIncrement();
+                    break;
+                case ABORT_TRANSACTION:
+                    log.debug("Aborting transaction.");
+                    producer.abortTransaction();
+                    break;
+                case NO_OP:
+                    tookAction = false;
+                    break;
+            }
+            return tookAction;
+        }
+
+        private void sendMessage() throws InterruptedException {
+            if (!partitionsIterator.hasNext())
+                partitionsIterator = activePartitions.iterator();
+
+            TopicPartition partition = partitionsIterator.next();
+            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                partition.topic(), partition.partition(), keys.next(), values.next());
+            sendFuture = producer.send(record,
+                new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
+            throttle.increment();
+        }
+
         void recordDuration(long durationMs) {
             histogram.add(durationMs);
         }
@@ -242,9 +303,11 @@ public class ProduceBenchWorker implements TaskWorker {
 
     public class StatusUpdater implements Runnable {
         private final Histogram histogram;
+        private final AtomicLong transactionsCommitted;
 
-        StatusUpdater(Histogram histogram) {
+        StatusUpdater(Histogram histogram, AtomicLong transactionsCommitted) {
             this.histogram = histogram;
+            this.transactionsCommitted = transactionsCommitted;
         }
 
         @Override
@@ -261,7 +324,8 @@ public class ProduceBenchWorker implements TaskWorker {
             StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
                 summary.percentiles().get(0).value(),
                 summary.percentiles().get(1).value(),
-                summary.percentiles().get(2).value());
+                summary.percentiles().get(2).value(),
+                transactionsCommitted.get());
             status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
             return statusData;
         }
@@ -273,6 +337,7 @@ public class ProduceBenchWorker implements TaskWorker {
         private final int p50LatencyMs;
         private final int p95LatencyMs;
         private final int p99LatencyMs;
+        private final long transactionsCommitted;
 
         /**
          * The percentiles to use when calculating the histogram data.
@@ -285,12 +350,14 @@ public class ProduceBenchWorker implements TaskWorker {
                    @JsonProperty("averageLatencyMs") float averageLatencyMs,
                    @JsonProperty("p50LatencyMs") int p50latencyMs,
                    @JsonProperty("p95LatencyMs") int p95latencyMs,
-                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+                   @JsonProperty("p99LatencyMs") int p99latencyMs,
+                   @JsonProperty("transactionsCommitted") long transactionsCommitted) {
             this.totalSent = totalSent;
             this.averageLatencyMs = averageLatencyMs;
             this.p50LatencyMs = p50latencyMs;
             this.p95LatencyMs = p95latencyMs;
             this.p99LatencyMs = p99latencyMs;
+            this.transactionsCommitted = transactionsCommitted;
         }
 
         @JsonProperty
@@ -299,6 +366,11 @@ public class ProduceBenchWorker implements TaskWorker {
         }
 
         @JsonProperty
+        public long transactionsCommitted() {
+            return transactionsCommitted;
+        }
+
+        @JsonProperty
         public float averageLatencyMs() {
             return averageLatencyMs;
         }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
new file mode 100644
index 0000000..5ec47ec
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Generates actions that should be taken by a producer that uses transactions.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = "uniform"),
+})
+public interface TransactionGenerator {
+    enum TransactionAction {
+        BEGIN_TRANSACTION, COMMIT_TRANSACTION, ABORT_TRANSACTION, NO_OP
+    }
+
+    /**
+     * Returns the next action that the producer should take in regards to transactions.
+     * This method should be called every time before a producer sends a message.
+     * This means that most of the time it should return #{@link TransactionAction#NO_OP}
+     * to signal the producer that its next step should be to send a message.
+     */
+    TransactionAction nextAction();
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
new file mode 100644
index 0000000..1fbfbc2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A uniform transactions generator where every N records are grouped in a separate transaction
+ */
+public class UniformTransactionsGenerator implements TransactionGenerator {
+
+    private final int messagesPerTransaction;
+    private int messagesInTransaction = -1;
+
+    @JsonCreator
+    public UniformTransactionsGenerator(@JsonProperty("messagesPerTransaction") int messagesPerTransaction)
{
+        if (messagesPerTransaction < 1)
+            throw new IllegalArgumentException("Cannot have less than one message per transaction.");
+
+        this.messagesPerTransaction = messagesPerTransaction;
+    }
+
+    @JsonProperty
+    public int messagesPerTransaction() {
+        return messagesPerTransaction;
+    }
+
+    @Override
+    public synchronized TransactionAction nextAction() {
+        if (messagesInTransaction == -1) {
+            messagesInTransaction = 0;
+            return TransactionAction.BEGIN_TRANSACTION;
+        }
+        if (messagesInTransaction == messagesPerTransaction) {
+            messagesInTransaction = -1;
+            return TransactionAction.COMMIT_TRANSACTION;
+        }
+
+        messagesInTransaction += 1;
+        return TransactionAction.NO_OP;
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 5e6ff81..c324ec4 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -54,7 +55,7 @@ public class JsonSerializationTest {
         verify(new WorkerRunning(null, null, 0, null));
         verify(new WorkerStopping(null, null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, null, null, null, null, null));
+            0, 0, null, null, Optional.empty(), null, null, null, null, null));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
             0, null, null, 0));
         verify(new TopicsSpec());


Mime
View raw message