kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Pass a streams config to replace the single state dir (#4714)
Date Mon, 19 Mar 2018 21:17:11 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f364cd  MINOR: Pass a streams config to replace the single state dir (#4714)
0f364cd is described below

commit 0f364cd53a7a56e4ddea1db44fd044860ba2c0b4
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Mar 19 14:17:00 2018 -0700

    MINOR: Pass a streams config to replace the single state dir (#4714)
    
    This is a general change and is re-requisite to allow streams benchmark test with different
streams tests. For the streams benchmark itself I will have a separate PR for switching configs.
Details:
    
    1. Create a "streams.properties" file under PERSISTENT_ROOT before all the streams test.
For now it will only contain a single config of state.dir pointing to PERSISTENT_ROOT.
    
    2. For all the system test related code, replace the main function parameter of state.dir
with propsFilename, then inside the function load the props from the file and apply overrides
if necessary.
    
    3. Minor fixes.
    
    Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../java/org/apache/kafka/common/utils/Utils.java  | 13 +++--
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 55 +++++++++++++---------
 .../apache/kafka/streams/perf/YahooBenchmark.java  |  6 +--
 .../streams/tests/BrokerCompatibilityTest.java     | 14 ++----
 .../apache/kafka/streams/tests/EosTestClient.java  | 14 ++----
 .../kafka/streams/tests/SmokeTestClient.java       | 13 ++---
 .../kafka/streams/tests/SmokeTestDriver.java       | 14 ++++--
 .../tests/StreamsBrokerDownResilienceTest.java     |  9 ++--
 .../apache/kafka/streams/tests/StreamsEosTest.java | 21 +++++----
 .../kafka/streams/tests/StreamsSmokeTest.java      | 21 +++++----
 .../streams/tests/StreamsStandByReplicaTest.java   | 22 +++++----
 tests/kafkatest/services/kafka/__init__.py         |  1 +
 tests/kafkatest/services/streams.py                | 28 ++++++++---
 13 files changed, 136 insertions(+), 95 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index c7a654a..ba621ab 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -25,7 +25,6 @@ import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
@@ -505,11 +504,17 @@ public final class Utils {
      * Read a properties file from the given path
      * @param filename The path of the file to read
      */
-    public static Properties loadProps(String filename) throws IOException, FileNotFoundException
{
+    public static Properties loadProps(String filename) throws IOException {
         Properties props = new Properties();
-        try (InputStream propStream = new FileInputStream(filename)) {
-            props.load(propStream);
+
+        if (filename != null) {
+            try (InputStream propStream = new FileInputStream(filename)) {
+                props.load(propStream);
+            }
+        } else {
+            System.out.println("Did not load any properties since the property file is not
specified");
         }
+
         return props;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 592c0e1..5d7041e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -50,6 +51,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -77,10 +79,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class SimpleBenchmark {
 
     final String kafka;
-    private final File stateDir;
     final Boolean loadPhase;
     final String testName;
     final int numThreads;
+    final Properties props;
     static final String ALL_TESTS = "all";
     private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
     private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
@@ -119,10 +121,10 @@ public class SimpleBenchmark {
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
     private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
 
-    public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase,
+    public SimpleBenchmark(final Properties props, final String kafka, final Boolean loadPhase,
                            final String testName, final int numRecords, final int numThreads)
{
         super();
-        this.stateDir = stateDir;
+        this.props = props;
         this.kafka = kafka;
         this.loadPhase = loadPhase;
         this.testName = testName;
@@ -193,13 +195,23 @@ public class SimpleBenchmark {
         }
     }
 
-    public static void main(String[] args) {
-        String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
-        int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
-        boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
-        String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
-        int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
+    public static void main(String[] args) throws IOException {
+        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
+        final String propFileName = args.length > 1 ? args[1] : null;
+        final int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
+        final boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
+        final String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
+        final int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
+
+        final Properties props = Utils.loadProps(propFileName);
+
+        final String stateDirStr;
+        if (props.containsKey(StreamsConfig.STATE_DIR_CONFIG)) {
+            stateDirStr = props.get(StreamsConfig.STATE_DIR_CONFIG).toString();
+        } else {
+            stateDirStr = TestUtils.tempDirectory().getAbsolutePath();
+            props.put(StreamsConfig.STATE_DIR_CONFIG, stateDirStr);
+        }
 
         final File stateDir = new File(stateDirStr);
         stateDir.mkdir();
@@ -207,20 +219,18 @@ public class SimpleBenchmark {
         // Note: this output is needed for automated tests and must not be removed
         System.out.println("StreamsTest instance started");
         System.out.println("kafka=" + kafka);
-        System.out.println("stateDir=" + stateDir);
+        System.out.println("streamsProperties=" + props);
         System.out.println("numRecords=" + numRecords);
         System.out.println("loadPhase=" + loadPhase);
         System.out.println("testName=" + testName);
         System.out.println("numThreads=" + numThreads);
 
-        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName,
numRecords, numThreads);
+        SimpleBenchmark benchmark = new SimpleBenchmark(props, kafka, loadPhase, testName,
numRecords, numThreads);
         benchmark.run();
     }
 
-    public Properties setStreamProperties(final String applicationId) {
-        Properties props = new Properties();
+    public void setStreamProperties(final String applicationId) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -234,7 +244,6 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         //TODO remove this config or set to smaller value when KIP-91 is merged
         props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
-        return props;
     }
 
     private Properties setProduceConsumeProperties(final String clientId) {
@@ -313,7 +322,7 @@ public class SimpleBenchmark {
         }
 
         CountDownLatch latch = new CountDownLatch(1);
-        Properties props = setStreamProperties("simple-benchmark-count");
+        setStreamProperties("simple-benchmark-count");
         final KafkaStreams streams = createCountStreams(props, countTopic, latch);
         runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec
counted]: ", latch);
     }
@@ -331,7 +340,7 @@ public class SimpleBenchmark {
         CountDownLatch latch = new CountDownLatch(1);
 
         // setup join
-        Properties props = setStreamProperties("simple-benchmark-kstream-ktable-join");
+        setStreamProperties("simple-benchmark-kstream-ktable-join");
         final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic,
kTableTopic, latch);
 
         // run benchmark
@@ -351,7 +360,7 @@ public class SimpleBenchmark {
         CountDownLatch latch = new CountDownLatch(1);
 
         // setup join
-        Properties props = setStreamProperties("simple-benchmark-kstream-kstream-join");
+        setStreamProperties("simple-benchmark-kstream-kstream-join");
         final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1,
kStreamTopic2, latch);
 
         // run benchmark
@@ -370,7 +379,7 @@ public class SimpleBenchmark {
         CountDownLatch latch = new CountDownLatch(1);
 
         // setup join
-        Properties props = setStreamProperties("simple-benchmark-ktable-ktable-join");
+        setStreamProperties("simple-benchmark-ktable-ktable-join");
         final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1,
kTableTopic2, latch);
 
         // run benchmark
@@ -586,7 +595,7 @@ public class SimpleBenchmark {
     }
 
     private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
-        Properties props = setStreamProperties("simple-benchmark-streams");
+        setStreamProperties("simple-benchmark-streams");
 
         StreamsBuilder builder = new StreamsBuilder();
 
@@ -625,7 +634,7 @@ public class SimpleBenchmark {
     }
 
     private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch)
{
-        final Properties props = setStreamProperties("simple-benchmark-streams-with-sink");
+        setStreamProperties("simple-benchmark-streams-with-sink");
 
         StreamsBuilder builder = new StreamsBuilder();
 
@@ -724,7 +733,7 @@ public class SimpleBenchmark {
     private KafkaStreams createKafkaStreamsWithStateStore(String topic,
                                                           final CountDownLatch latch,
                                                           boolean enableCaching) {
-        Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
+        setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
 
         StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 9490101..f63a71f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -197,11 +197,11 @@ public class YahooBenchmark {
         }
 
         CountDownLatch latch = new CountDownLatch(1);
-        Properties props = parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
+        parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
         //TODO remove this config or set to smaller value when KIP-91 is merged
-        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
+        parent.props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
 
-        final KafkaStreams streams = createYahooBenchmarkStreams(props, campaignsTopic, eventsTopic,
latch, parent.numRecords);
+        final KafkaStreams streams = createYahooBenchmarkStreams(parent.props, campaignsTopic,
eventsTopic, latch, parent.numRecords);
         parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec
counted]: ", latch);
 
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index af76304..37d0cb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -28,15 +28,15 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.TestUtils;
 
-import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Properties;
@@ -47,20 +47,16 @@ public class BrokerCompatibilityTest {
     private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
     private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
 
-    public static void main(final String[] args) {
+    public static void main(final String[] args) throws IOException {
         System.out.println("StreamsTest instance started");
 
         final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+        final String propFileName = args.length > 1 ? args[1] : null;
         final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;
 
-        final File stateDir = new File(stateDirStr);
-        stateDir.mkdir();
-
-        final Properties streamsProperties = new Properties();
+        final Properties streamsProperties = Utils.loadProps(propFileName);
         streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");
-        streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 2bf90d6..ecc3b91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.KeyValueStore;
 
-import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,17 +38,17 @@ public class EosTestClient extends SmokeTestUtil {
 
     static final String APP_ID = "EosTest";
     private final String kafka;
-    private final File stateDir;
+    private final Properties properties;
     private final boolean withRepartitioning;
     private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
 
     private KafkaStreams streams;
     private boolean uncaughtException;
 
-    EosTestClient(final String kafka, final File stateDir, final boolean withRepartitioning)
{
+    EosTestClient(final String kafka, final Properties properties, final boolean withRepartitioning)
{
         super();
         this.kafka = kafka;
-        this.stateDir = stateDir;
+        this.properties = properties;
         this.withRepartitioning = withRepartitioning;
     }
 
@@ -80,7 +79,7 @@ public class EosTestClient extends SmokeTestUtil {
             if (streams == null) {
                 uncaughtException = false;
 
-                streams = createKafkaStreams(stateDir, kafka);
+                streams = createKafkaStreams(properties, kafka);
                 streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
                     @Override
                     public void uncaughtException(final Thread t, final Throwable e) {
@@ -113,11 +112,9 @@ public class EosTestClient extends SmokeTestUtil {
         }
     }
 
-    private KafkaStreams createKafkaStreams(final File stateDir,
+    private KafkaStreams createKafkaStreams(final Properties props,
                                             final String kafka) {
-        final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
@@ -129,7 +126,6 @@ public class EosTestClient extends SmokeTestUtil {
         //TODO remove this config or set to smaller value when KIP-91 is merged
         props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
 
-
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, Integer> data = builder.stream("data");
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 727c421..e2493b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -35,26 +35,25 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.Stores;
 
-import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String kafka;
-    private final File stateDir;
+    private final Properties streamsProperties;
     private KafkaStreams streams;
     private Thread thread;
     private boolean uncaughtException = false;
 
-    public SmokeTestClient(File stateDir, String kafka) {
+    public SmokeTestClient(final Properties streamsProperties, final String kafka) {
         super();
-        this.stateDir = stateDir;
         this.kafka = kafka;
+        this.streamsProperties = streamsProperties;
     }
 
     public void start() {
-        streams = createKafkaStreams(stateDir, kafka);
+        streams = createKafkaStreams(streamsProperties, kafka);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
@@ -94,10 +93,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         }
     }
 
-    private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
-        final Properties props = new Properties();
+    private static KafkaStreams createKafkaStreams(final Properties props, final String kafka)
{
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 882e9c0..a3f520a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
@@ -94,10 +95,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
         };
 
-        SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka);
-        SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka);
-        SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka);
-        SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka);
+        final Properties props = new Properties();
+        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "1").getAbsolutePath());
+        SmokeTestClient streams1 = new SmokeTestClient(props, kafka);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "2").getAbsolutePath());
+        SmokeTestClient streams2 = new SmokeTestClient(props, kafka);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "3").getAbsolutePath());
+        SmokeTestClient streams3 = new SmokeTestClient(props, kafka);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "4").getAbsolutePath());
+        SmokeTestClient streams4 = new SmokeTestClient(props, kafka);
 
         System.out.println("starting the driver");
         driver.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index ed4cd27..5219c95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -21,13 +21,14 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -43,17 +44,17 @@ public class StreamsBrokerDownResilienceTest {
 
     private static final String SINK_TOPIC = "streamsResilienceSink";
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws IOException {
 
         System.out.println("StreamsTest instance started");
 
         final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+        final String propFileName = args.length > 1 ? args[1] : null;
         final String additionalConfigs = args.length > 2 ? args[2] : null;
 
         final Serde<String> stringSerde = Serdes.String();
 
-        final Properties streamsProperties = new Properties();
+        final Properties streamsProperties = Utils.loadProps(propFileName);
         streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience");
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index 4921143..c5195cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -16,26 +16,31 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.io.File;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Properties;
 
 public class StreamsEosTest {
 
     /**
-     *  args ::= command kafka zookeeper stateDir
+     *  args ::= kafka propFileName command
      *  command := "run" | "process" | "verify"
      */
-    public static void main(final String[] args) {
+    public static void main(final String[] args) throws IOException {
         final String kafka = args[0];
-        final String stateDir = args.length > 1 ? args[1] : null;
+        final String propFileName = args.length > 1 ? args[1] : null;
         final String command = args.length > 2 ? args[2] : null;
 
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
         System.out.println("StreamsTest instance started");
         System.out.println("kafka=" + kafka);
-        System.out.println("stateDir=" + stateDir);
+        System.out.println("props=" + streamsProperties);
         System.out.println("command=" + command);
         System.out.flush();
 
-        if (command == null || stateDir == null) {
+        if (command == null || propFileName == null) {
             System.exit(-1);
         }
 
@@ -44,10 +49,10 @@ public class StreamsEosTest {
                 EosTestDriver.generate(kafka);
                 break;
             case "process":
-                new EosTestClient(kafka, new File(stateDir), false).start();
+                new EosTestClient(kafka, streamsProperties, false).start();
                 break;
             case "process-complex":
-                new EosTestClient(kafka, new File(stateDir), true).start();
+                new EosTestClient(kafka, streamsProperties, true).start();
                 break;
             case "verify":
                 EosTestDriver.verify(kafka, false);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 64597bd..27aba29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -16,27 +16,32 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.io.File;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 public class StreamsSmokeTest {
 
     /**
-     *  args ::= command kafka zookeeper stateDir
+     *  args ::= kafka propFileName command
      *  command := "run" | "process"
      *
      * @param args
      */
-    public static void main(String[] args) throws InterruptedException {
-        String kafka = args[0];
-        String stateDir = args.length > 1 ? args[1] : null;
-        String command = args.length > 2 ? args[2] : null;
+    public static void main(final String[] args) throws InterruptedException, IOException
{
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+        final String command = args.length > 2 ? args[2] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
 
         System.out.println("StreamsTest instance started");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
-        System.out.println("stateDir=" + stateDir);
+        System.out.println("props=" + streamsProperties);
 
         switch (command) {
             case "standalone":
@@ -51,7 +56,7 @@ public class StreamsSmokeTest {
                 break;
             case "process":
                 // this starts a KafkaStreams client
-                final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka);
+                final SmokeTestClient client = new SmokeTestClient(streamsProperties, kafka);
                 client.start();
                 break;
             case "close-deadlock-test":
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 0e3aa13..1f44d61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -32,8 +33,8 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -41,27 +42,32 @@ import java.util.concurrent.TimeUnit;
 
 public class StreamsStandByReplicaTest {
 
-    public static void main(String[] args) {
+    public static void main(final String[] args) throws IOException {
 
         System.out.println("StreamsTest instance started");
 
         final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+        final String propFileName = args.length > 1 ? args[1] : null;
         final String additionalConfigs = args.length > 2 ? args[2] : null;
 
         final Serde<String> stringSerde = Serdes.String();
 
-        final Properties streamsProperties = new Properties();
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        final Properties streamsProperties = Utils.loadProps(propFileName);
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
-        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, stateDirStr);
         streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
true);
 
+        if (additionalConfigs == null) {
+            System.err.println("additional configs are not provided");
+            System.err.flush();
+            System.exit(1);
+        }
+
         final Map<String, String> updated = SystemTestUtil.parseConfigs(additionalConfigs);
         System.out.println("Updating configs with " + updated);
 
diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py
index cd29e4b..5abbbee 100644
--- a/tests/kafkatest/services/kafka/__init__.py
+++ b/tests/kafkatest/services/kafka/__init__.py
@@ -15,3 +15,4 @@
 
 from kafka import KafkaService
 from util import TopicPartition
+from config import KafkaConfig
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e886c94..6da5a25 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -19,13 +19,19 @@ import signal
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import KafkaConfig
 
 
+STATE_DIR = "state.dir"
+
 class StreamsTestBaseService(KafkaPathResolverMixin, Service):
+
     """Base class for Streams Test services providing some common settings and functionality"""
 
     PERSISTENT_ROOT = "/mnt/streams"
+
     # The log file contains normal log4j logs written using a file appender. stdout and stderr
are handled separately
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "streams.properties")
     LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
     STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
     STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
@@ -112,7 +118,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
     def start_cmd(self, node):
         args = self.args.copy()
         args['kafka'] = self.kafka.bootstrap_servers()
-        args['state_dir'] = self.PERSISTENT_ROOT
+        args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
         args['pidfile'] = self.PID_FILE
@@ -121,15 +127,21 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-              " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s"
\
+              " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s"
\
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>>
%(stderr)s 3> %(pidfile)s" % args
-        self.logger.info("Executing: " + cmd)
+
+        self.logger.info("Executing Streams cmd: " + cmd)
 
         return cmd
 
-    def start_node(self, node):
-        node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+    def prop_file(self, node):
+        cfg = KafkaConfig(**{STATE_DIR: self.PERSISTENT_ROOT})
+        return cfg.render()
 
+    def start_node(self, node):
+        node.account.mkdirs(self.PERSISTENT_ROOT)
+        prop_file = self.prop_file(node)
+        node.account.create_file(self.CONFIG_FILE, prop_file)
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties',
log_file=self.LOG_FILE))
 
         self.logger.info("Starting StreamsTest process on " + str(node.account))
@@ -223,7 +235,7 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
     def start_cmd(self, node):
         args = self.args.copy()
         args['kafka'] = self.kafka.bootstrap_servers(validate=False)
-        args['state_dir'] = self.PERSISTENT_ROOT
+        args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
         args['pidfile'] = self.PID_FILE
@@ -232,9 +244,11 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-              " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s"
\
+              " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s"
\
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>>
%(stderr)s 3> %(pidfile)s" % args
+
         self.logger.info("Executing: " + cmd)
+
         return cmd
 
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message