kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7428: ConnectionStressSpec: add "action", allow multiple clients (#5668)
Date Wed, 26 Sep 2018 09:09:09 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 888423e  KAFKA-7428: ConnectionStressSpec: add "action", allow multiple clients (#5668)
888423e is described below

commit 888423ee569d27876468c83243be6dab10a57dc9
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Wed Sep 26 02:08:48 2018 -0700

    KAFKA-7428: ConnectionStressSpec: add "action", allow multiple clients (#5668)
---
 .../trogdor/workload/ConnectionStressSpec.java     | 31 +++++++++++++++++-----
 .../trogdor/workload/ConnectionStressWorker.java   | 22 ++++++++++++++-
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
index 4195f9b..c22396f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -24,40 +24,52 @@ import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * The specification for a task which connects and disconnects many times a
  * second to stress the broker.
  */
 public class ConnectionStressSpec extends TaskSpec {
-    private final String clientNode;
+    private final List<String> clientNodes;
     private final String bootstrapServers;
     private final Map<String, String> commonClientConf;
     private final int targetConnectionsPerSec;
     private final int numThreads;
+    private final ConnectionStressAction action;
+
+    enum ConnectionStressAction {
+        CONNECT,
+        FETCH_METADATA
+    }
 
     @JsonCreator
     public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
             @JsonProperty("durationMs") long durationMs,
-            @JsonProperty("clientNode") String clientNode,
+            @JsonProperty("clientNode") List<String> clientNodes,
             @JsonProperty("bootstrapServers") String bootstrapServers,
             @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
             @JsonProperty("targetConnectionsPerSec") int targetConnectionsPerSec,
-            @JsonProperty("numThreads") int numThreads) {
+            @JsonProperty("numThreads") int numThreads,
+            @JsonProperty("action") ConnectionStressAction action) {
         super(startMs, durationMs);
-        this.clientNode = (clientNode == null) ? "" : clientNode;
+        this.clientNodes = (clientNodes == null) ? Collections.emptyList() :
+            Collections.unmodifiableList(new ArrayList<>(clientNodes));
         this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.targetConnectionsPerSec = targetConnectionsPerSec;
         this.numThreads = numThreads < 1 ? 1 : numThreads;
+        this.action = (action == null) ? ConnectionStressAction.CONNECT : action;
     }
 
     @JsonProperty
-    public String clientNode() {
-        return clientNode;
+    public List<String> clientNode() {
+        return clientNodes;
     }
 
     @JsonProperty
@@ -80,11 +92,16 @@ public class ConnectionStressSpec extends TaskSpec {
         return numThreads;
     }
 
+    @JsonProperty
+    public ConnectionStressAction action() {
+        return action;
+    }
+
     public TaskController newController(String id) {
         return new TaskController() {
             @Override
             public Set<String> targetNodes(Topology topology) {
-                return Collections.singleton(clientNode);
+                return new TreeSet<>(clientNodes);
             }
         };
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 5d78de8..82d1d6c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
@@ -132,7 +133,15 @@ public class ConnectionStressWorker implements TaskWorker {
                     }
                     throttle.increment();
                     long lastTimeMs = throttle.lastTimeMs();
-                    boolean success = attemptConnection(conf, updater);
+                    boolean success = false;
+                    switch (spec.action()) {
+                        case CONNECT:
+                            success = attemptConnection(conf, updater);
+                            break;
+                        case FETCH_METADATA:
+                            success = attemptMetadataFetch(props);
+                            break;
+                    }
                     synchronized (ConnectionStressWorker.this) {
                         totalConnections++;
                         if (!success) {
@@ -185,6 +194,17 @@ public class ConnectionStressWorker implements TaskWorker {
                 return false;
             }
         }
+
+        private boolean attemptMetadataFetch(Properties conf) {
+            try (AdminClient client = AdminClient.create(conf)) {
+                client.describeCluster().nodes().get();
+            } catch (RuntimeException e) {
+                return false;
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        }
     }
 
     public static class StatusData {


Mime
View raw message