kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6658; Fix RoundTripWorkload and make k/v generation configurable (#4710)
Date Fri, 16 Mar 2018 23:15:53 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a70e4f9  KAFKA-6658; Fix RoundTripWorkload and make k/v generation configurable (#4710)
a70e4f9 is described below

commit a70e4f95d713dd34757b3807171a2e520dd9c04d
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Fri Mar 16 16:15:49 2018 -0700

    KAFKA-6658; Fix RoundTripWorkload and make k/v generation configurable (#4710)
    
    Make PayloadGenerator an interface which can have multiple implementations: constant,
uniform random, sequential.
    
    Allow different payload generators to be used for keys and values.
    
    This change fixes RoundTripWorkload.  Previously RoundTripWorkload was unable to get the
sequence number of the keys that it produced.
---
 .../trogdor/workload/ConstantPayloadGenerator.java |  54 ++++++
 .../kafka/trogdor/workload/PayloadGenerator.java   | 149 +++--------------
 .../kafka/trogdor/workload/PayloadIterator.java    |  55 ++++++
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |  20 ++-
 .../kafka/trogdor/workload/ProduceBenchWorker.java |  16 +-
 .../kafka/trogdor/workload/RoundTripWorker.java    |  10 +-
 .../trogdor/workload/RoundTripWorkloadSpec.java    |   9 +
 .../workload/SequentialPayloadGenerator.java       |  65 ++++++++
 .../workload/UniformRandomPayloadGenerator.java    |  89 ++++++++++
 .../trogdor/common/JsonSerializationTest.java      |   4 +-
 .../trogdor/workload/PayloadGeneratorTest.java     | 184 ++++++++++-----------
 11 files changed, 422 insertions(+), 233 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java
new file mode 100644
index 0000000..d0c1c48
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantPayloadGenerator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A PayloadGenerator which always generates a constant payload.
+ */
+public class ConstantPayloadGenerator implements PayloadGenerator {
+    private final int size;
+    private final byte[] value;
+
+    @JsonCreator
+    public ConstantPayloadGenerator(@JsonProperty("size") int size,
+                                    @JsonProperty("value") byte[] value) {
+        this.size = size;
+        this.value = (value == null || value.length == 0) ? new byte[size] : value;
+    }
+
+    @JsonProperty
+    public int size() {
+        return size;
+    }
+
+    @JsonProperty
+    public byte[] value() {
+        return value;
+    }
+
+    @Override
+    public byte[] generate(long position) {
+        byte[] next = new byte[size];
+        for (int i = 0; i < next.length; i += value.length) {
+            System.arraycopy(value, 0, next, i, Math.min(next.length - i, value.length));
+        }
+        return next;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
index 9acd5fa..4895f21 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -14,133 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.trogdor.workload;
 
-import org.apache.kafka.clients.producer.ProducerRecord;
+package org.apache.kafka.trogdor.workload;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 /**
- * Describes the payload for the producer record. Currently, it generates constant size values
- * and either null keys or constant size key (depending on requested key type). The generator
- * is deterministic -- two generator objects created with the same key type, message size,
and
- * value divergence ratio (see `valueDivergenceRatio` description) will generate the same
sequence
- * of key/value pairs.
+ * Generates byte arrays based on a position argument.
+ *
+ * The array generated at a given position should be the same no matter how many
+ * times generate() is invoked.  PayloadGenerator instances should be immutable
+ * and thread-safe.
  */
-public class PayloadGenerator {
-
-    public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3;
-    public static final int DEFAULT_MESSAGE_SIZE = 512;
-
-    /**
-     * This is the ratio of how much each next value is different from the previous value.
This
-     * is directly related to compression rate we will get. Example: 0.3 divergence ratio
gets us
-     * about 0.3 - 0.45 compression rate with lz4.
-     */
-    private final double valueDivergenceRatio;
-    private final long baseSeed;
-    private long currentPosition;
-    private byte[] baseRecordValue;
-    private PayloadKeyType recordKeyType;
-    private Random random;
-
-    public PayloadGenerator() {
-        this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
-    }
-
-    /**
-     * Generator will generate null keys and values of size `messageSize`
-     * @param messageSize number of bytes used for key + value
-     */
-    public PayloadGenerator(int messageSize) {
-        this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
-    }
-
-    /**
-     * Generator will generate keys of given type and values of size 'messageSize' - (key
size).
-     * If the given key type requires more bytes than messageSize, then the resulting payload
-     * will be keys of size required for the given key type and 0-length values.
-     * @param messageSize number of bytes used for key + value
-     * @param keyType type of keys generated
-     */
-    public PayloadGenerator(int messageSize, PayloadKeyType keyType) {
-        this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO);
-    }
-
-    /**
-     * Generator will generate keys of given type and values of size 'messageSize' - (key
size).
-     * If the given key type requires more bytes than messageSize, then the resulting payload
-     * will be keys of size required for the given key type and 0-length values.
-     * @param messageSize key + value size
-     * @param valueDivergenceRatio ratio of how much each next value is different from the
previous
-     *                             value. Used to approximately control target compression
rate (if
-     *                             compression is used).
-     */
-    public PayloadGenerator(int messageSize, PayloadKeyType keyType,
-                            double valueDivergenceRatio) {
-        this.baseSeed = 856;  // some random number, may later let pass seed to constructor
-        this.currentPosition = 0;
-        this.valueDivergenceRatio = valueDivergenceRatio;
-        this.random = new Random(this.baseSeed);
-
-        final int valueSize = (messageSize > keyType.maxSizeInBytes())
-                              ? messageSize - keyType.maxSizeInBytes() : 0;
-        this.baseRecordValue = new byte[valueSize];
-        // initialize value with random bytes
-        for (int i = 0; i < baseRecordValue.length; ++i) {
-            baseRecordValue[i] = (byte) (random.nextInt(26) + 65);
-        }
-        this.recordKeyType = keyType;
-    }
-
-    /**
-     * Returns current position of the payload generator.
-     */
-    public long position() {
-        return currentPosition;
-    }
-
-    /**
-     * Creates record based on the current position, and increments current position.
-     */
-    public ProducerRecord<byte[], byte[]> nextRecord(String topicName) {
-        return nextRecord(topicName, currentPosition++);
-    }
-
-    /**
-     * Creates record based on the given position. Does not change the current position.
-     */
-    public ProducerRecord<byte[], byte[]> nextRecord(String topicName, long position)
{
-        byte[] keyBytes = null;
-        if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) {
-            keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array();
-        } else if (recordKeyType != PayloadKeyType.KEY_NULL) {
-            throw new UnsupportedOperationException(
-                "PayloadGenerator does not know how to generate key for key type " + recordKeyType);
-        }
-        return new ProducerRecord<>(topicName, keyBytes, nextValue(position));
-    }
-
-    @Override
-    public String toString() {
-        return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes()
-               + ", recordValueSize=" + baseRecordValue.length
-               + ", valueDivergenceRatio=" + valueDivergenceRatio + ")";
-    }
-
-    /**
-     * Returns producer record value
-     */
-    private byte[] nextValue(long position) {
-        // set the seed based on the given position to make sure that the same value is generated
-        // for the same position.
-        random.setSeed(baseSeed + 31 * position + 1);
-        // randomize some of the payload to achieve expected compression rate
-        byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length);
-        for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i)
-            recordValue[i] = (byte) (random.nextInt(26) + 65);
-        return recordValue;
-    }
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = "constant"),
+    @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = "sequential"),
+    @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = "uniformRandom")
+    })
+public interface PayloadGenerator {
+    /**
+     * Generate a payload.
+     *
+     * @param position  The position to use to generate the payload
+     *
+     * @return          A new array object containing the payload.
+     */
+    byte[] generate(long position);
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java
new file mode 100644
index 0000000..a5f3bae
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadIterator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import java.util.Iterator;
+
+/**
+ * An iterator which wraps a PayloadGenerator.
+ */
+public final class PayloadIterator implements Iterator<byte[]> {
+    private final PayloadGenerator generator;
+    private long position = 0;
+
+    public PayloadIterator(PayloadGenerator generator) {
+        this.generator = generator;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return true;
+    }
+
+    @Override
+    public synchronized byte[] next() {
+        return generator.generate(position++);
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    public synchronized void seek(long position) {
+        this.position = position;
+    }
+
+    public synchronized long position() {
+        return this.position;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 3e05a53..a798e73 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -37,7 +37,8 @@ public class ProduceBenchSpec extends TaskSpec {
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
     private final int maxMessages;
-    private final int messageSize;
+    private final PayloadGenerator keyGenerator;
+    private final PayloadGenerator valueGenerator;
     private final Map<String, String> producerConf;
     private final int totalTopics;
     private final int activeTopics;
@@ -49,7 +50,8 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("bootstrapServers") String bootstrapServers,
                          @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
                          @JsonProperty("maxMessages") int maxMessages,
-                         @JsonProperty("messageSize") int messageSize,
+                         @JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
+                         @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
                          @JsonProperty("producerConf") Map<String, String> producerConf,
                          @JsonProperty("totalTopics") int totalTopics,
                          @JsonProperty("activeTopics") int activeTopics) {
@@ -58,7 +60,10 @@ public class ProduceBenchSpec extends TaskSpec {
         this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
         this.targetMessagesPerSec = targetMessagesPerSec;
         this.maxMessages = maxMessages;
-        this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize;
+        this.keyGenerator = keyGenerator == null ?
+            new SequentialPayloadGenerator(4, 0) : keyGenerator;
+        this.valueGenerator = valueGenerator == null ?
+            new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator;
         this.producerConf = (producerConf == null) ? new TreeMap<String, String>()
: producerConf;
         this.totalTopics = totalTopics;
         this.activeTopics = activeTopics;
@@ -85,8 +90,13 @@ public class ProduceBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public int messageSize() {
-        return messageSize;
+    public PayloadGenerator keyGenerator() {
+        return keyGenerator;
+    }
+
+    @JsonProperty
+    public PayloadGenerator valueGenerator() {
+        return valueGenerator;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 1bd386d..51f52d3 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -91,7 +91,7 @@ public class ProduceBenchWorker implements TaskWorker {
         if (!running.compareAndSet(false, true)) {
             throw new IllegalStateException("ProducerBenchWorker is already running.");
         }
-        log.info("{}: Activating ProduceBenchWorker.", id);
+        log.info("{}: Activating ProduceBenchWorker with {}", id, spec);
         this.executor = Executors.newScheduledThreadPool(1,
             ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
         this.status = status;
@@ -172,7 +172,9 @@ public class ProduceBenchWorker implements TaskWorker {
 
         private final KafkaProducer<byte[], byte[]> producer;
 
-        private final PayloadGenerator payloadGenerator;
+        private final PayloadIterator keys;
+
+        private final PayloadIterator values;
 
         private final Throttle throttle;
 
@@ -187,7 +189,8 @@ public class ProduceBenchWorker implements TaskWorker {
                 props.setProperty(entry.getKey(), entry.getValue());
             }
             this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new
ByteArraySerializer());
-            this.payloadGenerator = new PayloadGenerator(spec.messageSize());
+            this.keys = new PayloadIterator(spec.keyGenerator());
+            this.values = new PayloadIterator(spec.valueGenerator());
             this.throttle = new SendRecordsThrottle(perPeriod, producer);
         }
 
@@ -199,8 +202,10 @@ public class ProduceBenchWorker implements TaskWorker {
                 try {
                     for (int m = 0; m < spec.maxMessages(); m++) {
                         for (int i = 0; i < spec.activeTopics(); i++) {
-                            ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(topicIndexToName(i));
-                            future = producer.send(record, new SendRecordsCallback(this,
Time.SYSTEM.milliseconds()));
+                            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[],
byte[]>(
+                                topicIndexToName(i), 0, keys.next(), values.next());
+                            future = producer.send(record,
+                                new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
                         }
                         throttle.increment();
                     }
@@ -216,7 +221,6 @@ public class ProduceBenchWorker implements TaskWorker {
                 statusUpdaterFuture.cancel(false);
                 new StatusUpdater(histogram).run();
                 long curTimeMs = Time.SYSTEM.milliseconds();
-                log.info("Produced {}", payloadGenerator);
                 log.info("Sent {} total record(s) in {} ms.  status: {}",
                     histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
             }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 5dfac1f..1b9cb8f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -69,6 +70,8 @@ public class RoundTripWorker implements TaskWorker {
 
     private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class);
 
+    private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4,
0);
+
     private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker();
 
     private final String id;
@@ -183,7 +186,6 @@ public class RoundTripWorker implements TaskWorker {
             int perPeriod = WorkerUtils.
                 perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
-            payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX);
         }
 
         @Override
@@ -206,7 +208,9 @@ public class RoundTripWorker implements TaskWorker {
                     }
                     messagesSent++;
                     // we explicitly specify generator position based on message index
-                    ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(TOPIC_NAME,
messageIndex);
+                    ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC_NAME,
0,
+                        KEY_GENERATOR.generate(messageIndex),
+                        spec.valueGenerator().generate(messageIndex));
                     producer.send(record, new Callback() {
                         @Override
                         public void onCompletion(RecordMetadata metadata, Exception exception)
{
@@ -286,7 +290,7 @@ public class RoundTripWorker implements TaskWorker {
                         pollInvoked++;
                         ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
                         for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME))
{
-                            int messageIndex = ByteBuffer.wrap(record.key()).getInt();
+                            int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
                             messagesReceived++;
                             if (toReceiveTracker.removePending(messageIndex)) {
                                 uniqueMessagesReceived++;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 618c709..00bd833 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -39,6 +39,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
     private final NavigableMap<Integer, List<Integer>> partitionAssignments;
+    private final PayloadGenerator valueGenerator;
     private final int maxMessages;
 
     @JsonCreator
@@ -48,6 +49,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
              @JsonProperty("bootstrapServers") String bootstrapServers,
              @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
              @JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>>
partitionAssignments,
+             @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
              @JsonProperty("maxMessages") int maxMessages) {
         super(startMs, durationMs);
         this.clientNode = clientNode == null ? "" : clientNode;
@@ -55,6 +57,8 @@ public class RoundTripWorkloadSpec extends TaskSpec {
         this.targetMessagesPerSec = targetMessagesPerSec;
         this.partitionAssignments = partitionAssignments == null ?
             new TreeMap<Integer, List<Integer>>() : partitionAssignments;
+        this.valueGenerator = valueGenerator == null ?
+            new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator;
         this.maxMessages = maxMessages;
     }
 
@@ -79,6 +83,11 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     }
 
     @JsonProperty
+    public PayloadGenerator valueGenerator() {
+        return valueGenerator;
+    }
+
+    @JsonProperty
     public int maxMessages() {
         return maxMessages;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java
new file mode 100644
index 0000000..e0b785a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/SequentialPayloadGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A PayloadGenerator which generates a sequentially increasing payload.
+ *
+ * The generated number will wrap around to 0 after the maximum value is reached.
+ * Payloads bigger than 8 bytes will always just be padded with zeros after byte 8.
+ */
+public class SequentialPayloadGenerator implements PayloadGenerator {
+    private final int size;
+    private final long startOffset;
+    private final ByteBuffer buf;
+
+    @JsonCreator
+    public SequentialPayloadGenerator(@JsonProperty("size") int size,
+                                      @JsonProperty("offset") long startOffset) {
+        this.size = size;
+        this.startOffset = startOffset;
+        this.buf = ByteBuffer.allocate(8);
+        // Little-endian byte order allows us to support arbitrary lengths more easily,
+        // since the first byte is always the lowest-order byte.
+        this.buf.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @JsonProperty
+    public int size() {
+        return size;
+    }
+
+    @JsonProperty
+    public long startOffset() {
+        return startOffset;
+    }
+
+    @Override
+    public synchronized byte[] generate(long position) {
+        buf.clear();
+        buf.putLong(position + startOffset);
+        byte[] result = new byte[size];
+        System.arraycopy(buf.array(), 0, result, 0, Math.min(buf.array().length, result.length));
+        return result;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java
new file mode 100644
index 0000000..4642dcf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformRandomPayloadGenerator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Random;
+
+/**
+ * A PayloadGenerator which generates a uniform random payload.
+ *
+ * This generator generates pseudo-random payloads that can be reproduced from run to run.
+ * The guarantees are the same as those of java.util.Random.
+ *
+ * This payload generator also has the option to append padding bytes at the end of the payload.
+ * The padding bytes are always the same, no matter what the position is.  This is useful
when
+ * simulating a partly-compressible stream of user data.
+ */
+public class UniformRandomPayloadGenerator implements PayloadGenerator {
+    private final int size;
+    private final long seed;
+    private final int padding;
+    private final Random random = new Random();
+    private final byte[] padBytes;
+    private final byte[] randomBytes;
+
+    @JsonCreator
+    public UniformRandomPayloadGenerator(@JsonProperty("size") int size,
+                                         @JsonProperty("seed") long seed,
+                                         @JsonProperty("padding") int padding) {
+        this.size = size;
+        this.seed = seed;
+        this.padding = padding;
+        if (padding < 0 || padding > size) {
+            throw new RuntimeException("Invalid value " + padding + " for " +
+                "padding: the number of padding bytes must not be smaller than " +
+                "0 or greater than the total payload size.");
+        }
+        this.padBytes = new byte[padding];
+        random.setSeed(seed);
+        random.nextBytes(padBytes);
+        this.randomBytes = new byte[size - padding];
+    }
+
+    @JsonProperty
+    public int size() {
+        return size;
+    }
+
+    @JsonProperty
+    public long seed() {
+        return seed;
+    }
+
+    @JsonProperty
+    public int padding() {
+        return padding;
+    }
+
+    @Override
+    public synchronized byte[] generate(long position) {
+        byte[] result = new byte[size];
+        if (randomBytes.length > 0) {
+            random.setSeed(seed + position);
+            random.nextBytes(randomBytes);
+            System.arraycopy(randomBytes, 0, result, 0, Math.min(randomBytes.length, result.length));
+        }
+        if (padBytes.length > 0) {
+            System.arraycopy(padBytes, 0, result, randomBytes.length, result.length - randomBytes.length);
+        }
+        return result;
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 4e65d99..77a7932 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -49,9 +49,9 @@ public class JsonSerializationTest {
         verify(new WorkerRunning(null, 0, null));
         verify(new WorkerStopping(null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, 0, null, 0, 0));
+            0, 0, null, null, null, 0, 0));
         verify(new RoundTripWorkloadSpec(0, 0, null, null,
-            0, null, 0));
+            0, null, null, 0));
         verify(new SampleTaskSpec(0, 0, 0, null));
     }
 
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
index d2954a5..25ef2e3 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
@@ -17,128 +17,126 @@
 
 package org.apache.kafka.trogdor.workload;
 
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Arrays;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 
 public class PayloadGeneratorTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
 
     @Test
-    public void testGeneratorStartsAtPositionZero() {
-        PayloadGenerator payloadGenerator = new PayloadGenerator();
-        assertEquals(0, payloadGenerator.position());
+    public void testConstantPayloadGenerator() {
+        byte[] alphabet = new byte[26];
+        for (int i = 0; i < alphabet.length; i++) {
+            alphabet[i] = (byte) ('a' + i);
+        }
+        byte[] expectedSuperset = new byte[512];
+        for (int i = 0; i < expectedSuperset.length; i++) {
+            expectedSuperset[i] = (byte) ('a' + (i % 26));
+        }
+        for (int i : new int[] {1, 5, 10, 100, 511, 512}) {
+            ConstantPayloadGenerator generator = new ConstantPayloadGenerator(i, alphabet);
+            assertArrayContains(expectedSuperset, generator.generate(0));
+            assertArrayContains(expectedSuperset, generator.generate(10));
+            assertArrayContains(expectedSuperset, generator.generate(100));
+        }
     }
 
-    @Test
-    public void testDefaultPayload() {
-        final long numRecords = 262;
-        PayloadGenerator payloadGenerator = new PayloadGenerator();
-
-        // make sure that each time we produce a different value (except if compression rate
is 0)
-        byte[] prevValue = null;
-        long expectedPosition = 0;
-        for (int i = 0; i < numRecords; i++) {
-            ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
-            assertNull(record.key());
-            assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length);
-            assertEquals(++expectedPosition, payloadGenerator.position());
-            assertFalse("Position " + payloadGenerator.position(),
-                        Arrays.equals(prevValue, record.value()));
-            prevValue = record.value().clone();
-        }
+    private static void assertArrayContains(byte[] expectedSuperset, byte[] actual) {
+        byte[] expected = new byte[actual.length];
+        System.arraycopy(expectedSuperset, 0, expected, 0, expected.length);
+        assertArrayEquals(expected, actual);
     }
 
     @Test
-    public void testNullKeyTypeValueSizeIsMessageSize() {
-        final int size = 200;
-        PayloadGenerator payloadGenerator = new PayloadGenerator(size);
-        ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
-        assertNull(record.key());
-        assertEquals(size, record.value().length);
+    public void testSequentialPayloadGenerator() {
+        SequentialPayloadGenerator g4 = new SequentialPayloadGenerator(4, 1);
+        assertLittleEndianArrayEquals(1, g4.generate(0));
+        assertLittleEndianArrayEquals(2, g4.generate(1));
+
+        SequentialPayloadGenerator g8 = new SequentialPayloadGenerator(8, 0);
+        assertLittleEndianArrayEquals(0, g8.generate(0));
+        assertLittleEndianArrayEquals(1, g8.generate(1));
+        assertLittleEndianArrayEquals(123123123123L, g8.generate(123123123123L));
+
+        SequentialPayloadGenerator g2 = new SequentialPayloadGenerator(2, 0);
+        assertLittleEndianArrayEquals(0, g2.generate(0));
+        assertLittleEndianArrayEquals(1, g2.generate(1));
+        assertLittleEndianArrayEquals(1, g2.generate(1));
+        assertLittleEndianArrayEquals(1, g2.generate(131073));
     }
 
-    @Test
-    public void testKeyContainsGeneratorPosition() {
-        final long numRecords = 10;
-        final int size = 200;
-        PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
-        for (int i = 0; i < numRecords; i++) {
-            assertEquals(i, generator.position());
-            ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic");
-            assertEquals(8, record.key().length);
-            assertEquals(size - 8, record.value().length);
-            assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong());
-        }
+    private static void assertLittleEndianArrayEquals(long expected, byte[] actual) {
+        byte[] longActual = new byte[8];
+        System.arraycopy(actual, 0, longActual, 0, Math.min(actual.length, longActual.length));
+        ByteBuffer buf = ByteBuffer.wrap(longActual).order(ByteOrder.LITTLE_ENDIAN);
+        assertEquals(expected, buf.getLong());
     }
 
     @Test
-    public void testGeneratePayloadWithExplicitPosition() {
-        final int size = 200;
-        PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
-        int position = 2;
-        while (position < 5000000) {
-            ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic",
position);
-            assertEquals(8, record.key().length);
-            assertEquals(size - 8, record.value().length);
-            assertEquals(position, ByteBuffer.wrap(record.key()).getLong());
-            position = position * 64;
+    public void testUniformRandomPayloadGenerator() {
+        PayloadIterator iter = new PayloadIterator(
+            new UniformRandomPayloadGenerator(1234, 456, 0));
+        byte[] prev = iter.next();
+        for (int uniques = 0; uniques < 1000; ) {
+            byte[] cur = iter.next();
+            assertEquals(prev.length, cur.length);
+            if (!Arrays.equals(prev, cur)) {
+                uniques++;
+            }
         }
+        testReproducible(new UniformRandomPayloadGenerator(1234, 456, 0));
+        testReproducible(new UniformRandomPayloadGenerator(1, 0, 0));
+        testReproducible(new UniformRandomPayloadGenerator(10, 6, 5));
+        testReproducible(new UniformRandomPayloadGenerator(512, 123, 100));
     }
 
-    public void testSamePositionGeneratesSameKeyAndValue() {
-        final int size = 100;
-        PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
-        ProducerRecord<byte[], byte[]> record1 = generator.nextRecord("test-topic");
-        assertEquals(1, generator.position());
-        ProducerRecord<byte[], byte[]> record2 = generator.nextRecord("test-topic");
-        assertEquals(2, generator.position());
-        ProducerRecord<byte[], byte[]> record3 = generator.nextRecord("test-topic",
0);
-        // position should not change if we generated record with specific position
-        assertEquals(2, generator.position());
-        assertFalse("Values at different positions should not match.",
-                    Arrays.equals(record1.value(), record2.value()));
-        assertFalse("Values at different positions should not match.",
-                    Arrays.equals(record3.value(), record2.value()));
-        assertTrue("Values at the same position should match.",
-                   Arrays.equals(record1.value(), record3.value()));
-    }
-
-    @Test
-    public void testGeneratesDeterministicKeyValues() {
-        final long numRecords = 194;
-        final int size = 100;
-        PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
-        PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
-        for (int i = 0; i < numRecords; ++i) {
-            ProducerRecord<byte[], byte[]> record1 = generator1.nextRecord("test-topic");
-            ProducerRecord<byte[], byte[]> record2 = generator2.nextRecord("test-topic");
-            assertTrue(Arrays.equals(record1.value(), record2.value()));
-            assertTrue(Arrays.equals(record1.key(), record2.key()));
-        }
+    private static void testReproducible(PayloadGenerator generator) {
+        byte[] val = generator.generate(123);
+        generator.generate(456);
+        byte[] val2 = generator.generate(123);
+        assertArrayEquals(val, val2);
     }
 
     @Test
-    public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() {
-        PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
-        ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic",
877);
-        assertEquals(8, record.key().length);
-        assertEquals(0, record.value().length);
+    public void testUniformRandomPayloadGeneratorPaddingBytes() {
+        UniformRandomPayloadGenerator generator =
+            new UniformRandomPayloadGenerator(1000, 456, 100);
+        byte[] val1 = generator.generate(0);
+        byte[] val1End = new byte[100];
+        System.arraycopy(val1, 900, val1End, 0, 100);
+        byte[] val2 = generator.generate(100);
+        byte[] val2End = new byte[100];
+        System.arraycopy(val2, 900, val2End, 0, 100);
+        byte[] val3 = generator.generate(200);
+        byte[] val3End = new byte[100];
+        System.arraycopy(val3, 900, val3End, 0, 100);
+        assertArrayEquals(val1End, val2End);
+        assertArrayEquals(val1End, val3End);
     }
 
     @Test
-    public void testNextRecordGeneratesNewByteArrayForValue() {
-        PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
-        ProducerRecord<byte[], byte[]> record1 = payloadGenerator.nextRecord("test-topic",
877);
-        ProducerRecord<byte[], byte[]> record2 = payloadGenerator.nextRecord("test-topic",
877);
-        assertNotEquals(record1.value(), record2.value());
+    public void testPayloadIterator() {
+        final int expectedSize = 50;
+        PayloadIterator iter = new PayloadIterator(
+            new ConstantPayloadGenerator(expectedSize, new byte[0]));
+        final byte[] expected = new byte[expectedSize];
+        assertEquals(0, iter.position());
+        assertArrayEquals(expected, iter.next());
+        assertEquals(1, iter.position());
+        assertArrayEquals(expected, iter.next());
+        assertArrayEquals(expected, iter.next());
+        assertEquals(3, iter.position());
+        iter.seek(0);
+        assertEquals(0, iter.position());
     }
 }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message