kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: Trogdor: Added commonClientConf and adminClientConf to workload specs (#4757)
Date Fri, 06 Apr 2018 18:21:47 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da32db9  Trogdor: Added commonClientConf and adminClientConf to workload specs (#4757)
da32db9 is described below

commit da32db9f3462242082f23973dc154f6f5e69f069
Author: Anna Povzner <anna@confluent.io>
AuthorDate: Fri Apr 6 11:21:41 2018 -0700

    Trogdor: Added commonClientConf and adminClientConf to workload specs (#4757)
    
    Currently, WorkerUtils will be able to create topics when there is no security. To be
able to work with secure kafka, WorkerUtils.createTopic() needs to be able to take security
configs. This PR adds commonClientConf field to both producer bench and roundtrip workload
specs so that users can specify security and other common configs once for producer/consumer
and adminClient. Also added adminClientConf field to workload specs so that users can specify
adminClient specific configs if t [...]
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. Mccabe <cmccabe@confluent.io>,
Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../apache/kafka/trogdor/common/WorkerUtils.java   | 33 +++++++++++--
 .../org/apache/kafka/trogdor/task/TaskSpec.java    |  6 +++
 .../kafka/trogdor/workload/ProduceBenchSpec.java   | 19 +++++++-
 .../kafka/trogdor/workload/ProduceBenchWorker.java | 12 +++--
 .../kafka/trogdor/workload/RoundTripWorker.java    |  6 ++-
 .../trogdor/workload/RoundTripWorkloadSpec.java    | 33 +++++++++++++
 .../trogdor/common/JsonSerializationTest.java      |  4 +-
 .../kafka/trogdor/common/WorkerUtilsTest.java      | 54 ++++++++++++++++++++++
 8 files changed, 154 insertions(+), 13 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 99c13c0..98dbf38 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -74,6 +74,23 @@ public final class WorkerUtils {
         return (int) perPeriod;
     }
 
+    /**
+     * Adds all properties from commonConf and then from clientConf to given 'props' (in
+     * that order, over-writing properties with the same keys).
+     * @param props              Properties object that may contain zero or more properties
+     * @param commonConf         Map with common client properties
+     * @param clientConf         Map with client properties
+     */
+    public static void addConfigsToProperties(
+        Properties props, Map<String, String> commonConf, Map<String, String>
clientConf) {
+        for (Map.Entry<String, String> commonEntry : commonConf.entrySet()) {
+            props.setProperty(commonEntry.getKey(), commonEntry.getValue());
+        }
+        for (Map.Entry<String, String> entry : clientConf.entrySet()) {
+            props.setProperty(entry.getKey(), entry.getValue());
+        }
+    }
+
     private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
     private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
     private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
@@ -85,6 +102,9 @@ public final class WorkerUtils {
      *
      * @param log               The logger to use.
      * @param bootstrapServers  The bootstrap server list.
+     * @param commonClientConf  Common client config
+     * @param adminClientConf   AdminClient config. This config has precedence over fields
in
+     *                          common client config.
      * @param topics            Maps topic names to partition assignments.
      * @param failOnExisting    If true, the method will throw TopicExistsException if one
or
      *                          more topics already exist. Otherwise, the existing topics
are
@@ -93,12 +113,14 @@ public final class WorkerUtils {
      *                          number of partitions, the method throws RuntimeException.
      */
     public static void createTopics(
-        Logger log, String bootstrapServers,
+        Logger log, String bootstrapServers, Map<String, String> commonClientConf,
+        Map<String, String> adminClientConf,
         Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
         // this method wraps the call to createTopics() that takes admin client, so that
we can
         // unit test the functionality with MockAdminClient. The exception is caught and
         // re-thrown so that admin client is closed when the method returns.
-        try (AdminClient adminClient = createAdminClient(bootstrapServers)) {
+        try (AdminClient adminClient
+                 = createAdminClient(bootstrapServers, commonClientConf, adminClientConf))
{
             createTopics(log, adminClient, topics, failOnExisting);
         } catch (Exception e) {
             log.warn("Failed to create or verify topics {}", topics, e);
@@ -227,10 +249,15 @@ public final class WorkerUtils {
         }
     }
 
-    private static AdminClient createAdminClient(String bootstrapServers) {
+    private static AdminClient createAdminClient(
+        String bootstrapServers, Map<String, String> commonClientConf,
+        Map<String, String> adminClientConf) {
         Properties props = new Properties();
         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
+        // first add common client config, and then admin client config to properties, possibly
+        // over-writing default or common properties.
+        addConfigsToProperties(props, commonClientConf, adminClientConf);
         return AdminClient.create(props);
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
index 84ed75a..af7a76f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.kafka.trogdor.common.JsonUtil;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
 
@@ -102,4 +104,8 @@ public abstract class TaskSpec {
     public String toString() {
         return JsonUtil.toJsonString(this);
     }
+
+    protected Map<String, String> configOrEmptyMap(Map<String, String> config)
{
+        return (config == null) ? Collections.<String, String>emptyMap() : config;
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 7b1bedd..ec6e309 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -26,7 +26,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.Set;
 
 /**
@@ -45,6 +44,8 @@ public class ProduceBenchSpec extends TaskSpec {
     private final PayloadGenerator keyGenerator;
     private final PayloadGenerator valueGenerator;
     private final Map<String, String> producerConf;
+    private final Map<String, String> adminClientConf;
+    private final Map<String, String> commonClientConf;
     private final int totalTopics;
     private final int activeTopics;
     private final String topicPrefix;
@@ -61,6 +62,8 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
                          @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
                          @JsonProperty("producerConf") Map<String, String> producerConf,
+                         @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
+                         @JsonProperty("adminClientConf") Map<String, String> adminClientConf,
                          @JsonProperty("totalTopics") int totalTopics,
                          @JsonProperty("activeTopics") int activeTopics,
                          @JsonProperty("topicPrefix") String topicPrefix,
@@ -75,7 +78,9 @@ public class ProduceBenchSpec extends TaskSpec {
             new SequentialPayloadGenerator(4, 0) : keyGenerator;
         this.valueGenerator = valueGenerator == null ?
             new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator;
-        this.producerConf = (producerConf == null) ? new TreeMap<String, String>()
: producerConf;
+        this.producerConf = configOrEmptyMap(producerConf);
+        this.commonClientConf = configOrEmptyMap(commonClientConf);
+        this.adminClientConf = configOrEmptyMap(adminClientConf);
         this.totalTopics = totalTopics;
         this.activeTopics = activeTopics;
         this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix;
@@ -121,6 +126,16 @@ public class ProduceBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
+    public Map<String, String> commonClientConf() {
+        return commonClientConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> adminClientConf() {
+        return adminClientConf;
+    }
+
+    @JsonProperty
     public int totalTopics() {
         return totalTopics;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index e291bae..a891b83 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -109,9 +109,11 @@ public class ProduceBenchWorker implements TaskWorker {
                 Map<String, NewTopic> newTopics = new HashMap<>();
                 for (int i = 0; i < spec.totalTopics(); i++) {
                     String name = topicIndexToName(i);
-                    newTopics.put(name, new NewTopic(name, spec.numPartitions(), spec.replicationFactor()));
+                    newTopics.put(name, new NewTopic(name, spec.numPartitions(),
+                                                     spec.replicationFactor()));
                 }
-                WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics, false);
+                WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
+                                         spec.adminClientConf(), newTopics, false);
 
                 executor.submit(new SendRecords());
             } catch (Throwable e) {
@@ -182,9 +184,9 @@ public class ProduceBenchWorker implements TaskWorker {
                 new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
-            for (Map.Entry<String, String> entry : spec.producerConf().entrySet())
{
-                props.setProperty(entry.getKey(), entry.getValue());
-            }
+            // add common client configs to producer properties, and then user-specified
producer
+            // configs
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.producerConf());
             this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new
ByteArraySerializer());
             this.keys = new PayloadIterator(spec.keyGenerator());
             this.values = new PayloadIterator(spec.valueGenerator());
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index a05785c..08b11ac 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -124,7 +124,7 @@ public class RoundTripWorker implements TaskWorker {
                     throw new ConfigException("Invalid null or empty partitionAssignments.");
                 }
                 WorkerUtils.createTopics(
-                    log, spec.bootstrapServers(),
+                    log, spec.bootstrapServers(), spec.commonClientConf(), spec.adminClientConf(),
                     Collections.singletonMap(TOPIC_NAME,
                                              new NewTopic(TOPIC_NAME, spec.partitionAssignments())),
                     true);
@@ -184,6 +184,8 @@ public class RoundTripWorker implements TaskWorker {
             props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
             props.put(ProducerConfig.ACKS_CONFIG, "all");
             props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+            // user may over-write the defaults with common client config and producer config
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.producerConf());
             producer = new KafkaProducer<>(props, new ByteArraySerializer(),
                 new ByteArraySerializer());
             int perPeriod = WorkerUtils.
@@ -275,6 +277,8 @@ public class RoundTripWorker implements TaskWorker {
             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
             props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+            // user may over-write the defaults with common client config and consumer config
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
             consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
             consumer.subscribe(Collections.singleton(TOPIC_NAME));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 00bd833..3d0e3ef 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -26,6 +26,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
@@ -41,12 +42,20 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     private final NavigableMap<Integer, List<Integer>> partitionAssignments;
     private final PayloadGenerator valueGenerator;
     private final int maxMessages;
+    private final Map<String, String> commonClientConf;
+    private final Map<String, String> producerConf;
+    private final Map<String, String> consumerConf;
+    private final Map<String, String> adminClientConf;
 
     @JsonCreator
     public RoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
              @JsonProperty("durationMs") long durationMs,
              @JsonProperty("clientNode") String clientNode,
              @JsonProperty("bootstrapServers") String bootstrapServers,
+             @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
+             @JsonProperty("adminClientConf") Map<String, String> adminClientConf,
+             @JsonProperty("consumerConf") Map<String, String> consumerConf,
+             @JsonProperty("producerConf") Map<String, String> producerConf,
              @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
              @JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>>
partitionAssignments,
              @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@@ -60,6 +69,10 @@ public class RoundTripWorkloadSpec extends TaskSpec {
         this.valueGenerator = valueGenerator == null ?
             new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator;
         this.maxMessages = maxMessages;
+        this.commonClientConf = configOrEmptyMap(commonClientConf);
+        this.adminClientConf = configOrEmptyMap(adminClientConf);
+        this.producerConf = configOrEmptyMap(producerConf);
+        this.consumerConf = configOrEmptyMap(consumerConf);
     }
 
     @JsonProperty
@@ -92,6 +105,26 @@ public class RoundTripWorkloadSpec extends TaskSpec {
         return maxMessages;
     }
 
+    @JsonProperty
+    public Map<String, String> commonClientConf() {
+        return commonClientConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> adminClientConf() {
+        return adminClientConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> producerConf() {
+        return producerConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> consumerConf() {
+        return consumerConf;
+    }
+
     @Override
     public TaskController newController(String id) {
         return new TaskController() {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index dee7614..76b206b 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -49,8 +49,8 @@ public class JsonSerializationTest {
         verify(new WorkerRunning(null, 0, null));
         verify(new WorkerStopping(null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, null, 0, 0, "test-topic", 1, (short) 3));
-        verify(new RoundTripWorkloadSpec(0, 0, null, null,
+            0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
+        verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
             0, null, null, 0));
         verify(new SampleTaskSpec(0, 0, 0, null));
     }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
index 22b7846..fbe2389 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.trogdor.common;
 
 
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartitionInfo;
 
 import org.apache.kafka.common.Node;
@@ -41,6 +42,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 
 public class WorkerUtilsTest {
@@ -208,4 +210,56 @@ public class WorkerUtilsTest {
         assertEquals(0, adminClient.listTopics().names().get().size());
     }
 
+    @Test
+    public void testAddConfigsToPropertiesAddsAllConfigs() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        Properties resultProps = new Properties();
+        resultProps.putAll(props);
+        resultProps.put(ProducerConfig.CLIENT_ID_CONFIG, "test-client");
+        resultProps.put(ProducerConfig.LINGER_MS_CONFIG, "1000");
+
+        WorkerUtils.addConfigsToProperties(
+            props,
+            Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, "test-client"),
+            Collections.singletonMap(ProducerConfig.LINGER_MS_CONFIG, "1000"));
+        assertEquals(resultProps, props);
+    }
+
+    @Test
+    public void testCommonConfigOverwritesDefaultProps() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        Properties resultProps = new Properties();
+        resultProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        resultProps.put(ProducerConfig.ACKS_CONFIG, "1");
+        resultProps.put(ProducerConfig.LINGER_MS_CONFIG, "1000");
+
+        WorkerUtils.addConfigsToProperties(
+            props,
+            Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "1"),
+            Collections.singletonMap(ProducerConfig.LINGER_MS_CONFIG, "1000"));
+        assertEquals(resultProps, props);
+    }
+
+    @Test
+    public void testClientConfigOverwritesBothDefaultAndCommonConfigs() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        Properties resultProps = new Properties();
+        resultProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        resultProps.put(ProducerConfig.ACKS_CONFIG, "0");
+
+        WorkerUtils.addConfigsToProperties(
+            props,
+            Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "1"),
+            Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "0"));
+        assertEquals(resultProps, props);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message