kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: remove ZK from system tests
Date Wed, 18 Jan 2017 02:14:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 73b7ae001 -> e3bdc84d8


MINOR: remove ZK from system tests

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2391 from mjsax/kafka-4060-zk-follow-up-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3bdc84d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3bdc84d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3bdc84d

Branch: refs/heads/trunk
Commit: e3bdc84d864a5ca655f985473770f7b7cb24b79b
Parents: 73b7ae0
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Jan 17 18:14:43 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 17 18:14:43 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/perf/SimpleBenchmark.java     | 26 +++++++++-----------
 .../streams/smoketest/ShutdownDeadlockTest.java |  7 +-----
 .../streams/smoketest/SmokeTestClient.java      |  8 +++---
 .../streams/smoketest/SmokeTestDriver.java      |  8 +++---
 .../streams/smoketest/StreamsSmokeTest.java     | 10 +++-----
 5 files changed, 23 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7ba6161..fb26206 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -57,7 +57,6 @@ import java.util.Random;
 public class SimpleBenchmark {
 
     private final String kafka;
-    private final String zookeeper;
     private final File stateDir;
 
     private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
@@ -91,18 +90,16 @@ public class SimpleBenchmark {
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
     private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
 
-    public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
+    public SimpleBenchmark(File stateDir, String kafka) {
         super();
         this.stateDir = stateDir;
         this.kafka = kafka;
-        this.zookeeper = zookeeper;
     }
 
     public static void main(String[] args) throws Exception {
         String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        String zookeeper = args.length > 1 ? args[1] : "localhost:2181";
-        String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark";
-        numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 10000000;
+        String stateDirStr = args.length > 1 ? args[1] : "/tmp/kafka-streams-simple-benchmark";
+        numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
         endKey = numRecords - 1;
 
         final File stateDir = new File(stateDirStr);
@@ -113,11 +110,10 @@ public class SimpleBenchmark {
         // Note: this output is needed for automated tests and must not be removed
         System.out.println("SimpleBenchmark instance started");
         System.out.println("kafka=" + kafka);
-        System.out.println("zookeeper=" + zookeeper);
         System.out.println("stateDir=" + stateDir);
         System.out.println("numRecords=" + numRecords);
 
-        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
+        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka);
 
         // producer performance
         benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords,
true, numRecords, true);
@@ -239,7 +235,7 @@ public class SimpleBenchmark {
     public void processStream(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, zookeeper,
latch);
+        final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, latch);
 
         Thread thread = new Thread() {
             public void run() {
@@ -273,7 +269,7 @@ public class SimpleBenchmark {
     public void processStreamWithSink(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, zookeeper,
latch);
+        final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, latch);
 
         Thread thread = new Thread() {
             public void run() {
@@ -337,7 +333,7 @@ public class SimpleBenchmark {
     public void processStreamWithStateStore(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka,
zookeeper, latch, false);
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka,
latch, false);
         internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+store]:
");
 
     }
@@ -345,7 +341,7 @@ public class SimpleBenchmark {
     public void processStreamWithCachedStateStore(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka,
zookeeper, latch, true);
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka,
latch, true);
 
         internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+cache+store]:
");
     }
@@ -433,7 +429,7 @@ public class SimpleBenchmark {
         System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime
- startTime));
     }
 
-    private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, String
zookeeper, final CountDownLatch latch) {
+    private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, final
CountDownLatch latch) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -475,7 +471,7 @@ public class SimpleBenchmark {
         return new KafkaStreams(builder, props);
     }
 
-    private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka,
String zookeeper, final CountDownLatch latch) {
+    private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka,
final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -565,7 +561,7 @@ public class SimpleBenchmark {
         return new KafkaStreams(builder, streamConfig);
     }
 
-    private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String
kafka, String zookeeper,
+    private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String
kafka,
                                                           final CountDownLatch latch,
                                                           boolean enableCaching) {
         Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
index 7abbd0d..942e0c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
@@ -33,13 +33,9 @@ import java.util.concurrent.TimeUnit;
 public class ShutdownDeadlockTest {
 
     private final String kafka;
-    private final String zookeeper;
-
-    public ShutdownDeadlockTest(final String kafka,
-                                final String zookeeper) {
 
+    public ShutdownDeadlockTest(final String kafka) {
         this.kafka = kafka;
-        this.zookeeper = zookeeper;
     }
 
     public void start() {
@@ -47,7 +43,6 @@ public class ShutdownDeadlockTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "shouldNotDeadlock");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
         final KStreamBuilder builder = new KStreamBuilder();
         final KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(),
topic);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 9f4dbd9..374cc2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -37,20 +37,18 @@ import java.util.concurrent.TimeUnit;
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String kafka;
-    private final String zookeeper;
     private final File stateDir;
     private KafkaStreams streams;
     private Thread thread;
 
-    public SmokeTestClient(File stateDir, String kafka, String zookeeper) {
+    public SmokeTestClient(File stateDir, String kafka) {
         super();
         this.stateDir = stateDir;
         this.kafka = kafka;
-        this.zookeeper = zookeeper;
     }
 
     public void start() {
-        streams = createKafkaStreams(stateDir, kafka, zookeeper);
+        streams = createKafkaStreams(stateDir, kafka);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
@@ -75,7 +73,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         }
     }
 
-    private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper)
{
+    private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index 2ce7e1a..33464f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -94,10 +94,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
         };
 
-        SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka, zookeeper);
-        SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka, zookeeper);
-        SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka, zookeeper);
-        SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka, zookeeper);
+        SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka);
+        SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka);
+        SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka);
+        SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka);
 
         System.out.println("starting the driver");
         driver.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index ce0bd2b..31dd325 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -32,13 +32,11 @@ public class StreamsSmokeTest {
     public static void main(String[] args) throws Exception {
         String command = args[0];
         String kafka = args.length > 1 ? args[1] : null;
-        String zookeeper = args.length > 2 ? args[2] : null;
-        String stateDir = args.length > 3 ? args[3] : null;
+        String stateDir = args.length > 2 ? args[2] : null;
 
-        System.out.println("StreamsSmokeTest instance started");
+        System.out.println("StreamsTest instance started");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
-        System.out.println("zookeeper=" + zookeeper);
         System.out.println("stateDir=" + stateDir);
 
         switch (command) {
@@ -54,7 +52,7 @@ public class StreamsSmokeTest {
                 break;
             case "process":
                 // this starts a KafkaStreams client
-                final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka,
zookeeper);
+                final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka);
                 client.start();
 
                 Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -65,7 +63,7 @@ public class StreamsSmokeTest {
                 });
                 break;
             case "close-deadlock-test":
-                final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka, zookeeper);
+                final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);
                 test.start();
                 break;
             default:


Mime
View raw message