kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7819: Improve RoundTripWorker (#6187)
Date Thu, 21 Mar 2019 17:03:28 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6217178  KAFKA-7819: Improve RoundTripWorker (#6187)
6217178 is described below

commit 62171781396b613b6be8e13a2541ab0895b9bb6b
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Thu Mar 21 19:03:09 2019 +0200

    KAFKA-7819: Improve RoundTripWorker (#6187)
    
    RoundTripWorker to should use a long field for maxMessages rather than an int.  The consumer
group used should unique as well.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
 .../kafka/trogdor/workload/RoundTripWorker.java    | 90 ++++++++++++++--------
 .../trogdor/workload/RoundTripWorkloadSpec.java    |  6 +-
 2 files changed, 62 insertions(+), 34 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index b22292a..d08d807 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -57,11 +57,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class RoundTripWorker implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
@@ -82,6 +84,10 @@ public class RoundTripWorker implements TaskWorker {
 
     private final AtomicBoolean running = new AtomicBoolean(false);
 
+    private final Lock lock = new ReentrantLock();
+
+    private final Condition unackedSendsAreZero = lock.newCondition();
+
     private ScheduledExecutorService executor;
 
     private WorkerStatusTracker status;
@@ -92,7 +98,7 @@ public class RoundTripWorker implements TaskWorker {
 
     private KafkaConsumer<byte[], byte[]> consumer;
 
-    private CountDownLatch unackedSends;
+    private Long unackedSends;
 
     private ToSendTracker toSendTracker;
 
@@ -114,7 +120,7 @@ public class RoundTripWorker implements TaskWorker {
         this.doneFuture = doneFuture;
         this.producer = null;
         this.consumer = null;
-        this.unackedSends = new CountDownLatch(spec.maxMessages());
+        this.unackedSends = spec.maxMessages();
         executor.submit(new Prepare());
     }
 
@@ -157,29 +163,29 @@ public class RoundTripWorker implements TaskWorker {
     }
 
     private static class ToSendTrackerResult {
-        final int index;
+        final long index;
         final boolean firstSend;
 
-        ToSendTrackerResult(int index, boolean firstSend) {
+        ToSendTrackerResult(long index, boolean firstSend) {
             this.index = index;
             this.firstSend = firstSend;
         }
     }
 
     private static class ToSendTracker {
-        private final int maxMessages;
-        private final List<Integer> failed = new ArrayList<>();
-        private int frontier = 0;
+        private final long maxMessages;
+        private final List<Long> failed = new ArrayList<>();
+        private long frontier = 0;
 
-        ToSendTracker(int maxMessages) {
+        ToSendTracker(long maxMessages) {
             this.maxMessages = maxMessages;
         }
 
-        synchronized void addFailed(int index) {
+        synchronized void addFailed(long index) {
             failed.add(index);
         }
 
-        synchronized int frontier() {
+        synchronized long frontier() {
             return frontier;
         }
 
@@ -232,7 +238,7 @@ public class RoundTripWorker implements TaskWorker {
                         break;
                     }
                     throttle.increment();
-                    final int messageIndex = result.index;
+                    final long messageIndex = result.index;
                     if (result.firstSend) {
                         toReceiveTracker.addPending(messageIndex);
                         uniqueMessagesSent++;
@@ -248,7 +254,14 @@ public class RoundTripWorker implements TaskWorker {
                         spec.valueGenerator().generate(messageIndex));
                     producer.send(record, (metadata, exception) -> {
                         if (exception == null) {
-                            unackedSends.countDown();
+                            try {
+                                lock.lock();
+                                unackedSends -= 1;
+                                if (unackedSends <= 0)
+                                    unackedSendsAreZero.signalAll();
+                            } finally {
+                                lock.unlock();
+                            }
                         } else {
                             log.info("{}: Got exception when sending message {}: {}",
                                 id, messageIndex, exception.getMessage());
@@ -259,23 +272,28 @@ public class RoundTripWorker implements TaskWorker {
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "ProducerRunnable", e, doneFuture);
             } finally {
-                log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={};
" +
-                        "ackedSends={}.", id, messagesSent, uniqueMessagesSent,
-                        spec.maxMessages() - unackedSends.getCount());
+                try {
+                    lock.lock();
+                    log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={};
" +
+                                    "ackedSends={}/{}.", id, messagesSent, uniqueMessagesSent,
+                            spec.maxMessages() - unackedSends, spec.maxMessages());
+                } finally {
+                    lock.unlock();
+                }
             }
         }
     }
 
     private class ToReceiveTracker {
-        private final TreeSet<Integer> pending = new TreeSet<>();
+        private final TreeSet<Long> pending = new TreeSet<>();
 
-        private int totalReceived = 0;
+        private long totalReceived = 0;
 
-        synchronized void addPending(int messageIndex) {
+        synchronized void addPending(long messageIndex) {
             pending.add(messageIndex);
         }
 
-        synchronized boolean removePending(int messageIndex) {
+        synchronized boolean removePending(long messageIndex) {
             if (pending.remove(messageIndex)) {
                 totalReceived++;
                 return true;
@@ -284,18 +302,18 @@ public class RoundTripWorker implements TaskWorker {
             }
         }
 
-        synchronized int totalReceived() {
+        synchronized long totalReceived() {
             return totalReceived;
         }
 
         void log() {
-            int numToReceive;
-            List<Integer> list = new ArrayList<>(LOG_NUM_MESSAGES);
+            long numToReceive;
+            List<Long> list = new ArrayList<>(LOG_NUM_MESSAGES);
             synchronized (this) {
                 numToReceive = pending.size();
-                for (Iterator<Integer> iter = pending.iterator();
+                for (Iterator<Long> iter = pending.iterator();
                         iter.hasNext() && (list.size() < LOG_NUM_MESSAGES); )
{
-                    Integer i = iter.next();
+                    Long i = iter.next();
                     list.add(i);
                 }
             }
@@ -311,7 +329,7 @@ public class RoundTripWorker implements TaskWorker {
             this.props = new Properties();
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
             props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-1");
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-" + id);
             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
             props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
@@ -341,9 +359,16 @@ public class RoundTripWorker implements TaskWorker {
                             if (toReceiveTracker.removePending(messageIndex)) {
                                 uniqueMessagesReceived++;
                                 if (uniqueMessagesReceived >= spec.maxMessages()) {
-                                    log.info("{}: Consumer received the full count of {}
unique messages.  " +
-                                        "Waiting for all sends to be acked...", id, spec.maxMessages());
-                                    unackedSends.await();
+                                    try {
+                                        lock.lock();
+                                        log.info("{}: Consumer received the full count of
{} unique messages.  " +
+                                            "Waiting for all {} sends to be acked...", id,
spec.maxMessages(), unackedSends);
+                                        while (unackedSends > 0)
+                                            unackedSendsAreZero.await();
+                                    } finally {
+                                        lock.unlock();
+                                    }
+
                                     log.info("{}: all sends have been acked.", id);
                                     new StatusUpdater().update();
                                     doneFuture.complete("");
@@ -360,6 +385,8 @@ public class RoundTripWorker implements TaskWorker {
                         log.debug("{}: Consumer got WakeupException", id, e);
                     } catch (TimeoutException e) {
                         log.debug("{}: Consumer got TimeoutException", id, e);
+                    } finally {
+                        lock.unlock();
                     }
                 }
             } catch (Throwable e) {
@@ -415,9 +442,9 @@ public class RoundTripWorker implements TaskWorker {
     @Override
     public void stop(Platform platform) throws Exception {
         if (!running.compareAndSet(true, false)) {
-            throw new IllegalStateException("ProduceBenchWorker is not running.");
+            throw new IllegalStateException("RoundTripWorker is not running.");
         }
-        log.info("{}: Deactivating RoundTripWorkloadWorker.", id);
+        log.info("{}: Deactivating RoundTripWorker.", id);
         doneFuture.complete("");
         executor.shutdownNow();
         executor.awaitTermination(1, TimeUnit.DAYS);
@@ -428,5 +455,6 @@ public class RoundTripWorker implements TaskWorker {
         this.unackedSends = null;
         this.executor = null;
         this.doneFuture = null;
+        log.info("{}: Deactivated RoundTripWorker.", id);
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 42e09ee..fd30e8e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -36,7 +36,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     private final int targetMessagesPerSec;
     private final PayloadGenerator valueGenerator;
     private final TopicsSpec activeTopics;
-    private final int maxMessages;
+    private final long maxMessages;
     private final Map<String, String> commonClientConf;
     private final Map<String, String> producerConf;
     private final Map<String, String> consumerConf;
@@ -54,7 +54,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
              @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
              @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
              @JsonProperty("activeTopics") TopicsSpec activeTopics,
-             @JsonProperty("maxMessages") int maxMessages) {
+             @JsonProperty("maxMessages") long maxMessages) {
         super(startMs, durationMs);
         this.clientNode = clientNode == null ? "" : clientNode;
         this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers;
@@ -96,7 +96,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public int maxMessages() {
+    public long maxMessages() {
         return maxMessages;
     }
 


Mime
View raw message