kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7183: Add a trogdor test that creates many connections to brokers (#5393)
Date Mon, 06 Aug 2018 07:47:36 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 609c81e  KAFKA-7183: Add a trogdor test that creates many connections to brokers
(#5393)
609c81e is described below

commit 609c81ec8b190f4812e7008b30fc509ee1656d68
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Mon Aug 6 00:47:25 2018 -0700

    KAFKA-7183: Add a trogdor test that creates many connections to brokers (#5393)
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 checkstyle/import-control.xml                      |   1 +
 .../trogdor/workload/ConnectionStressSpec.java     |  96 +++++++++
 .../trogdor/workload/ConnectionStressWorker.java   | 232 +++++++++++++++++++++
 .../apache/kafka/trogdor/workload/Throttle.java    |  12 +-
 4 files changed, 338 insertions(+), 3 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 35f42e3..840e551 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -194,6 +194,7 @@
     <allow pkg="javax.servlet" />
     <allow pkg="javax.ws.rs" />
     <allow pkg="net.sourceforge.argparse4j" />
+    <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
     <allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
new file mode 100644
index 0000000..4195f9b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The specification for a task which connects and disconnects many times a
+ * second to stress the broker.
+ */
+public class ConnectionStressSpec extends TaskSpec {
+    private final String clientNode;
+    private final String bootstrapServers;
+    private final Map<String, String> commonClientConf;
+    private final int targetConnectionsPerSec;
+    private final int numThreads;
+
+    @JsonCreator
+    public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
+            @JsonProperty("durationMs") long durationMs,
+            @JsonProperty("clientNode") String clientNode,
+            @JsonProperty("bootstrapServers") String bootstrapServers,
+            @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
+            @JsonProperty("targetConnectionsPerSec") int targetConnectionsPerSec,
+            @JsonProperty("numThreads") int numThreads) {
+        super(startMs, durationMs);
+        this.clientNode = (clientNode == null) ? "" : clientNode;
+        this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
+        this.commonClientConf = configOrEmptyMap(commonClientConf);
+        this.targetConnectionsPerSec = targetConnectionsPerSec;
+        this.numThreads = numThreads < 1 ? 1 : numThreads;
+    }
+
+    @JsonProperty
+    public String clientNode() {
+        return clientNode;
+    }
+
+    @JsonProperty
+    public String bootstrapServers() {
+        return bootstrapServers;
+    }
+
+    @JsonProperty
+    public Map<String, String> commonClientConf() {
+        return commonClientConf;
+    }
+
+    @JsonProperty
+    public int targetConnectionsPerSec() {
+        return targetConnectionsPerSec;
+    }
+
+    @JsonProperty
+    public int numThreads() {
+        return numThreads;
+    }
+
+    public TaskController newController(String id) {
+        return new TaskController() {
+            @Override
+            public Set<String> targetNodes(Topology topology) {
+                return Collections.singleton(clientNode);
+            }
+        };
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ConnectionStressWorker(id, this);
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
new file mode 100644
index 0000000..5d78de8
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ConnectionStressWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class);
+
+    private static final int THROTTLE_PERIOD_MS = 100;
+
+    private static final int REPORT_INTERVAL_MS = 20000;
+
+    private final String id;
+
+    private final ConnectionStressSpec spec;
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private KafkaFutureImpl<String> doneFuture;
+
+    private WorkerStatusTracker status;
+
+    private long totalConnections;
+
+    private long totalFailedConnections;
+
+    private long startTimeMs;
+
+    private Throttle throttle;
+
+    private long nextReportTime;
+
+    private ExecutorService workerExecutor;
+
+    public ConnectionStressWorker(String id, ConnectionStressSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, WorkerStatusTracker status,
+                      KafkaFutureImpl<String> doneFuture) throws Exception {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("ConnectionStressWorker is already running.");
+        }
+        log.info("{}: Activating ConnectionStressWorker with {}", id, spec);
+        this.doneFuture = doneFuture;
+        this.status = status;
+        this.totalConnections = 0;
+        this.totalFailedConnections  = 0;
+        this.startTimeMs = Time.SYSTEM.milliseconds();
+        this.throttle = new ConnectStressThrottle(WorkerUtils.
+            perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
+        this.nextReportTime = 0;
+        this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
+            ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
+        for (int i = 0; i < spec.numThreads(); i++) {
+            this.workerExecutor.submit(new ConnectLoop());
+        }
+    }
+
+    private static class ConnectStressThrottle extends Throttle {
+        ConnectStressThrottle(int maxPerPeriod) {
+            super(maxPerPeriod, THROTTLE_PERIOD_MS);
+        }
+    }
+
+    public class ConnectLoop implements Runnable {
+        @Override
+        public void run() {
+            try {
+                Properties props = new Properties();
+                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+                WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf());
+                AdminClientConfig conf = new AdminClientConfig(props);
+                List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+                    conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+                ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
+                while (true) {
+                    if (doneFuture.isDone()) {
+                        break;
+                    }
+                    throttle.increment();
+                    long lastTimeMs = throttle.lastTimeMs();
+                    boolean success = attemptConnection(conf, updater);
+                    synchronized (ConnectionStressWorker.this) {
+                        totalConnections++;
+                        if (!success) {
+                            totalFailedConnections++;
+                        }
+                        if (lastTimeMs > nextReportTime) {
+                            status.update(JsonUtil.JSON_SERDE.valueToTree(
+                                new StatusData(totalConnections,
+                                    totalFailedConnections,
+                                    (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
+                            nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
+            }
+        }
+
+        private boolean attemptConnection(AdminClientConfig conf,
+                                          ManualMetadataUpdater updater) throws Exception
{
+            try {
+                List<Node> nodes = updater.fetchNodes();
+                Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+                try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf))
{
+                    try (Metrics metrics = new Metrics()) {
+                        LogContext logContext = new LogContext();
+                        try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                            metrics, Time.SYSTEM, "", channelBuilder, logContext)) {
+                            try (NetworkClient client = new NetworkClient(selector,
+                                    updater,
+                                    "ConnectionStressWorker",
+                                    1,
+                                    1000,
+                                    1000,
+                                    4096,
+                                    4096,
+                                    1000,
+                                    Time.SYSTEM,
+                                    false,
+                                    new ApiVersions(),
+                                    logContext)) {
+                                NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM,
100);
+                            }
+                        }
+                    }
+                }
+                return true;
+            } catch (IOException e) {
+                return false;
+            }
+        }
+    }
+
+    public static class StatusData {
+        private final long totalConnections;
+        private final long totalFailedConnections;
+        private final double connectsPerSec;
+
+        @JsonCreator
+        StatusData(@JsonProperty("totalConnections") long totalConnections,
+                   @JsonProperty("totalFailedConnections") long totalFailedConnections,
+                   @JsonProperty("connectsPerSec") double connectsPerSec) {
+            this.totalConnections = totalConnections;
+            this.totalFailedConnections = totalFailedConnections;
+            this.connectsPerSec = connectsPerSec;
+        }
+
+        @JsonProperty
+        public long totalConnections() {
+            return totalConnections;
+        }
+
+        @JsonProperty
+        public long totalFailedConnections() {
+            return totalFailedConnections;
+        }
+
+        @JsonProperty
+        public double connectsPerSec() {
+            return connectsPerSec;
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ConnectionStressWorker is not running.");
+        }
+        log.info("{}: Deactivating ConnectionStressWorker.", id);
+        doneFuture.complete("");
+        workerExecutor.shutdownNow();
+        workerExecutor.awaitTermination(1, TimeUnit.DAYS);
+        this.workerExecutor = null;
+        this.status = null;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
index 41f9d02..6a99c02 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
@@ -24,12 +24,14 @@ public class Throttle {
     private final int periodMs;
     private int count;
     private long prevPeriod;
+    private long lastTimeMs;
 
     Throttle(int maxPerPeriod, int periodMs) {
         this.maxPerPeriod = maxPerPeriod;
         this.periodMs = periodMs;
         this.count = maxPerPeriod;
         this.prevPeriod = -1;
+        this.lastTimeMs = 0;
     }
 
     synchronized public boolean increment() throws InterruptedException {
@@ -39,11 +41,11 @@ public class Throttle {
                 count++;
                 return throttled;
             }
-            long now = time().milliseconds();
-            long curPeriod = now / periodMs;
+            lastTimeMs = time().milliseconds();
+            long curPeriod = lastTimeMs / periodMs;
             if (curPeriod <= prevPeriod) {
                 long nextPeriodMs = (curPeriod + 1) * periodMs;
-                delay(nextPeriodMs - now);
+                delay(nextPeriodMs - lastTimeMs);
                 throttled = true;
             } else {
                 prevPeriod = curPeriod;
@@ -52,6 +54,10 @@ public class Throttle {
         }
     }
 
+    public synchronized long lastTimeMs() {
+        return lastTimeMs;
+    }
+
     protected Time time() {
         return Time.SYSTEM;
     }


Mime
View raw message