This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6217178 KAFKA-7819: Improve RoundTripWorker (#6187)
6217178 is described below
commit 62171781396b613b6be8e13a2541ab0895b9bb6b
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Thu Mar 21 19:03:09 2019 +0200
KAFKA-7819: Improve RoundTripWorker (#6187)
RoundTripWorker to should use a long field for maxMessages rather than an int. The consumer
group used should unique as well.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
.../kafka/trogdor/workload/RoundTripWorker.java | 90 ++++++++++++++--------
.../trogdor/workload/RoundTripWorkloadSpec.java | 6 +-
2 files changed, 62 insertions(+), 34 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index b22292a..d08d807 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -57,11 +57,13 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
@@ -82,6 +84,10 @@ public class RoundTripWorker implements TaskWorker {
private final AtomicBoolean running = new AtomicBoolean(false);
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition unackedSendsAreZero = lock.newCondition();
+
private ScheduledExecutorService executor;
private WorkerStatusTracker status;
@@ -92,7 +98,7 @@ public class RoundTripWorker implements TaskWorker {
private KafkaConsumer<byte[], byte[]> consumer;
- private CountDownLatch unackedSends;
+ private Long unackedSends;
private ToSendTracker toSendTracker;
@@ -114,7 +120,7 @@ public class RoundTripWorker implements TaskWorker {
this.doneFuture = doneFuture;
this.producer = null;
this.consumer = null;
- this.unackedSends = new CountDownLatch(spec.maxMessages());
+ this.unackedSends = spec.maxMessages();
executor.submit(new Prepare());
}
@@ -157,29 +163,29 @@ public class RoundTripWorker implements TaskWorker {
}
private static class ToSendTrackerResult {
- final int index;
+ final long index;
final boolean firstSend;
- ToSendTrackerResult(int index, boolean firstSend) {
+ ToSendTrackerResult(long index, boolean firstSend) {
this.index = index;
this.firstSend = firstSend;
}
}
private static class ToSendTracker {
- private final int maxMessages;
- private final List<Integer> failed = new ArrayList<>();
- private int frontier = 0;
+ private final long maxMessages;
+ private final List<Long> failed = new ArrayList<>();
+ private long frontier = 0;
- ToSendTracker(int maxMessages) {
+ ToSendTracker(long maxMessages) {
this.maxMessages = maxMessages;
}
- synchronized void addFailed(int index) {
+ synchronized void addFailed(long index) {
failed.add(index);
}
- synchronized int frontier() {
+ synchronized long frontier() {
return frontier;
}
@@ -232,7 +238,7 @@ public class RoundTripWorker implements TaskWorker {
break;
}
throttle.increment();
- final int messageIndex = result.index;
+ final long messageIndex = result.index;
if (result.firstSend) {
toReceiveTracker.addPending(messageIndex);
uniqueMessagesSent++;
@@ -248,7 +254,14 @@ public class RoundTripWorker implements TaskWorker {
spec.valueGenerator().generate(messageIndex));
producer.send(record, (metadata, exception) -> {
if (exception == null) {
- unackedSends.countDown();
+ try {
+ lock.lock();
+ unackedSends -= 1;
+ if (unackedSends <= 0)
+ unackedSendsAreZero.signalAll();
+ } finally {
+ lock.unlock();
+ }
} else {
log.info("{}: Got exception when sending message {}: {}",
id, messageIndex, exception.getMessage());
@@ -259,23 +272,28 @@ public class RoundTripWorker implements TaskWorker {
} catch (Throwable e) {
WorkerUtils.abort(log, "ProducerRunnable", e, doneFuture);
} finally {
- log.info("{}: ProducerRunnable is exiting. messagesSent={}; uniqueMessagesSent={};
" +
- "ackedSends={}.", id, messagesSent, uniqueMessagesSent,
- spec.maxMessages() - unackedSends.getCount());
+ try {
+ lock.lock();
+ log.info("{}: ProducerRunnable is exiting. messagesSent={}; uniqueMessagesSent={};
" +
+ "ackedSends={}/{}.", id, messagesSent, uniqueMessagesSent,
+ spec.maxMessages() - unackedSends, spec.maxMessages());
+ } finally {
+ lock.unlock();
+ }
}
}
}
private class ToReceiveTracker {
- private final TreeSet<Integer> pending = new TreeSet<>();
+ private final TreeSet<Long> pending = new TreeSet<>();
- private int totalReceived = 0;
+ private long totalReceived = 0;
- synchronized void addPending(int messageIndex) {
+ synchronized void addPending(long messageIndex) {
pending.add(messageIndex);
}
- synchronized boolean removePending(int messageIndex) {
+ synchronized boolean removePending(long messageIndex) {
if (pending.remove(messageIndex)) {
totalReceived++;
return true;
@@ -284,18 +302,18 @@ public class RoundTripWorker implements TaskWorker {
}
}
- synchronized int totalReceived() {
+ synchronized long totalReceived() {
return totalReceived;
}
void log() {
- int numToReceive;
- List<Integer> list = new ArrayList<>(LOG_NUM_MESSAGES);
+ long numToReceive;
+ List<Long> list = new ArrayList<>(LOG_NUM_MESSAGES);
synchronized (this) {
numToReceive = pending.size();
- for (Iterator<Integer> iter = pending.iterator();
+ for (Iterator<Long> iter = pending.iterator();
iter.hasNext() && (list.size() < LOG_NUM_MESSAGES); )
{
- Integer i = iter.next();
+ Long i = iter.next();
list.add(i);
}
}
@@ -311,7 +329,7 @@ public class RoundTripWorker implements TaskWorker {
this.props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-1");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-" + id);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
@@ -341,9 +359,16 @@ public class RoundTripWorker implements TaskWorker {
if (toReceiveTracker.removePending(messageIndex)) {
uniqueMessagesReceived++;
if (uniqueMessagesReceived >= spec.maxMessages()) {
- log.info("{}: Consumer received the full count of {}
unique messages. " +
- "Waiting for all sends to be acked...", id, spec.maxMessages());
- unackedSends.await();
+ try {
+ lock.lock();
+ log.info("{}: Consumer received the full count of
{} unique messages. " +
+ "Waiting for all {} sends to be acked...", id,
spec.maxMessages(), unackedSends);
+ while (unackedSends > 0)
+ unackedSendsAreZero.await();
+ } finally {
+ lock.unlock();
+ }
+
log.info("{}: all sends have been acked.", id);
new StatusUpdater().update();
doneFuture.complete("");
@@ -360,6 +385,8 @@ public class RoundTripWorker implements TaskWorker {
log.debug("{}: Consumer got WakeupException", id, e);
} catch (TimeoutException e) {
log.debug("{}: Consumer got TimeoutException", id, e);
+ } finally {
+ lock.unlock();
}
}
} catch (Throwable e) {
@@ -415,9 +442,9 @@ public class RoundTripWorker implements TaskWorker {
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {
- throw new IllegalStateException("ProduceBenchWorker is not running.");
+ throw new IllegalStateException("RoundTripWorker is not running.");
}
- log.info("{}: Deactivating RoundTripWorkloadWorker.", id);
+ log.info("{}: Deactivating RoundTripWorker.", id);
doneFuture.complete("");
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.DAYS);
@@ -428,5 +455,6 @@ public class RoundTripWorker implements TaskWorker {
this.unackedSends = null;
this.executor = null;
this.doneFuture = null;
+ log.info("{}: Deactivated RoundTripWorker.", id);
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 42e09ee..fd30e8e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -36,7 +36,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
private final int targetMessagesPerSec;
private final PayloadGenerator valueGenerator;
private final TopicsSpec activeTopics;
- private final int maxMessages;
+ private final long maxMessages;
private final Map<String, String> commonClientConf;
private final Map<String, String> producerConf;
private final Map<String, String> consumerConf;
@@ -54,7 +54,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("activeTopics") TopicsSpec activeTopics,
- @JsonProperty("maxMessages") int maxMessages) {
+ @JsonProperty("maxMessages") long maxMessages) {
super(startMs, durationMs);
this.clientNode = clientNode == null ? "" : clientNode;
this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers;
@@ -96,7 +96,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
}
@JsonProperty
- public int maxMessages() {
+ public long maxMessages() {
return maxMessages;
}
|