kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4594; Annotate integration tests and provide gradle build targets to run subsets of tests
Date Tue, 21 Mar 2017 10:13:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 05690f0c8 -> fef7fca2a


KAFKA-4594; Annotate integration tests and provide gradle build targets to run subsets of tests

This uses JUnit Categories to identify integration tests. Adds 2 new build targets:
`integrationTest` and `unitTest`.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska <eno@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2695 from dguy/junit-categories


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fef7fca2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fef7fca2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fef7fca2

Branch: refs/heads/trunk
Commit: fef7fca2af7fd00bc6b0889062da3f8b56b2224e
Parents: 05690f0
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Mar 21 09:55:46 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Mar 21 10:12:36 2017 +0000

----------------------------------------------------------------------
 README.md                                       |  32 ++--
 build.gradle                                    |  49 ++++-
 .../org/apache/kafka/test/IntegrationTest.java  |  20 ++
 .../kafka/api/IntegrationTestHarness.scala      |   2 +-
 .../integration/KafkaServerTestHarness.scala    |   2 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   8 +-
 jenkins.sh                                      |  20 ++
 .../apache/kafka/streams/KafkaStreamsTest.java  |   3 +
 .../integration/FanoutIntegrationTest.java      |  18 +-
 .../GlobalKTableIntegrationTest.java            |   3 +
 .../InternalTopicIntegrationTest.java           |   3 +
 .../integration/JoinIntegrationTest.java        |   3 +
 .../KStreamAggregationDedupIntegrationTest.java |   3 +
 .../KStreamAggregationIntegrationTest.java      |   3 +
 .../KStreamKTableJoinIntegrationTest.java       |  33 ++--
 .../integration/KStreamRepartitionJoinTest.java |  29 ++-
 ...eamsFineGrainedAutoResetIntegrationTest.java |   3 +
 .../KTableKTableJoinIntegrationTest.java        | 184 ++++++++++---------
 .../QueryableStateIntegrationTest.java          |  43 ++---
 .../integration/RegexSourceIntegrationTest.java |   3 +
 .../integration/ResetIntegrationTest.java       |   3 +
 21 files changed, 289 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 9c2413b..64d5f3c 100644
--- a/README.md
+++ b/README.md
@@ -14,38 +14,42 @@ Java 7 should be used for building in order to support both Java 7 and Java 8 at
 
 Now everything else will work.
 
-### Building a jar and running it ###
+### Build a jar and run it ###
     ./gradlew jar
 
 Follow instructions in http://kafka.apache.org/documentation.html#quickstart
 
-### Building source jar ###
+### Build source jar ###
     ./gradlew srcJar
 
-### Building aggregated javadoc ###
+### Build aggregated javadoc ###
     ./gradlew aggregatedJavadoc
 
-### Building javadoc and scaladoc ###
+### Build javadoc and scaladoc ###
     ./gradlew javadoc
     ./gradlew javadocJar # builds a javadoc jar for each module
     ./gradlew scaladoc
     ./gradlew scaladocJar # builds a scaladoc jar for each module
     ./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module
 
-### Running unit tests ###
-    ./gradlew test
-
-### Forcing re-running unit tests w/o code change ###
+### Run unit/integration tests ###
+    ./gradlew test # runs both unit and integration tests
+    ./gradlew unitTest
+    ./gradlew integrationTest
+    
+### Force re-running tests without code change ###
     ./gradlew cleanTest test
+    ./gradlew cleanTest unitTest
+    ./gradlew cleanTest integrationTest
 
-### Running a particular unit test ###
+### Running a particular unit/integration test ###
     ./gradlew -Dtest.single=RequestResponseSerializationTest core:test
 
-### Running a particular test method within a unit test ###
+### Running a particular test method within a unit/integration test ###
     ./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
     ./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testMetadataUpdateWaitTime
 
-### Running a particular unit test with log4j output ###
+### Running a particular unit/integration test with log4j output ###
 Change the log4j setting in either `clients/src/test/resources/log4j.properties` or `core/src/test/resources/log4j.properties`
 
     ./gradlew -i -Dtest.single=RequestResponseSerializationTest core:test
@@ -103,7 +107,7 @@ to avoid known issues with this configuration.
 ### Building the jar for all scala versions and for all projects ###
     ./gradlew jarAll
 
-### Running unit tests for all scala versions and for all projects ###
+### Running unit/integration tests for all scala versions and for all projects ###
     ./gradlew testAll
 
 ### Building a binary release gzipped tar ball for all scala versions ###
@@ -136,7 +140,7 @@ Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradl
 ### Running code quality checks ###
 There are two code quality analysis tools that we regularly run, findbugs and checkstyle.
 
-#### Checkstyle
+#### Checkstyle ####
 Checkstyle enforces a consistent coding style in Kafka.
 You can run checkstyle using:
 
@@ -145,7 +149,7 @@ You can run checkstyle using:
 The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also are printed to the console. The build will fail if Checkstyle fails.
 
-#### Findbugs
+#### Findbugs ####
 Findbugs uses static analysis to look for bugs in the code.
 You can run findbugs using:
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 57beebe..c8f4088 100644
--- a/build.gradle
+++ b/build.gradle
@@ -168,6 +168,10 @@ subprojects {
     }
   }
 
+  def testLoggingEvents = ["passed", "skipped", "failed"]
+  def testShowStandardStreams = false
+  def testExceptionFormat = 'full'
+
   test {
     maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
 
@@ -176,11 +180,45 @@ subprojects {
     jvmArgs = maxPermSizeArgs
 
     testLogging {
-      events = userTestLoggingEvents ?: ["passed", "skipped", "failed"]
-      showStandardStreams = userShowStandardStreams ?: false
-      exceptionFormat = 'full'
+      events = userTestLoggingEvents ?: testLoggingEvents
+      showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
+      exceptionFormat = testExceptionFormat
+    }
+
+  }
+
+  task integrationTest(type: Test, dependsOn: compileJava) {
+    maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
+
+    minHeapSize = "256m"
+    maxHeapSize = "2048m"
+    jvmArgs = maxPermSizeArgs
+
+    testLogging {
+      events = userTestLoggingEvents ?: testLoggingEvents
+      showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
+      exceptionFormat = testExceptionFormat
+    }
+    useJUnit {
+      includeCategories 'org.apache.kafka.test.IntegrationTest'
     }
+  }
 
+  task unitTest(type: Test, dependsOn: compileJava) {
+    maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
+
+    minHeapSize = "256m"
+    maxHeapSize = "2048m"
+    jvmArgs = maxPermSizeArgs
+
+    testLogging {
+      events = userTestLoggingEvents ?: testLoggingEvents
+      showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
+      exceptionFormat = testExceptionFormat
+    }
+    useJUnit {
+      excludeCategories 'org.apache.kafka.test.IntegrationTest'
+    }
   }
 
   jar {
@@ -892,6 +930,7 @@ project(':connect:api') {
     testCompile libs.junit
 
     testRuntime libs.slf4jlog4j
+    testCompile project(':clients').sourceSets.test.output
   }
 
   javadoc {
@@ -929,6 +968,7 @@ project(':connect:transforms') {
     testCompile libs.powermockEasymock
 
     testRuntime libs.slf4jlog4j
+    testCompile project(':clients').sourceSets.test.output
   }
 
   javadoc {
@@ -966,6 +1006,7 @@ project(':connect:json') {
     testCompile libs.powermockEasymock
 
     testRuntime libs.slf4jlog4j
+    testCompile project(':clients').sourceSets.test.output
   }
 
   javadoc {
@@ -1016,6 +1057,7 @@ project(':connect:runtime') {
     testCompile libs.powermockEasymock
 
     testCompile project(":connect:json")
+    testCompile project(':clients').sourceSets.test.output
 
     testRuntime libs.slf4jlog4j
   }
@@ -1068,6 +1110,7 @@ project(':connect:file') {
     testCompile libs.powermockEasymock
 
     testRuntime libs.slf4jlog4j
+    testCompile project(':clients').sourceSets.test.output
   }
 
   javadoc {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java b/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java
new file mode 100644
index 0000000..c73a681
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+public interface IntegrationTest {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5c8ceea..ef113fb 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -33,7 +33,7 @@ import scala.collection.mutable.Buffer
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
  */
-trait IntegrationTestHarness extends KafkaServerTestHarness {
+abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   val producerCount: Int
   val consumerCount: Int

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 9f40ec6..af3133a 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.network.ListenerName
 /**
  * A test harness that brings up some number of broker nodes
  */
-trait KafkaServerTestHarness extends ZooKeeperTestHarness {
+abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
   var instanceConfigs: Seq[KafkaConfig] = null
   var servers: Buffer[KafkaServer] = null
   var brokerList: String = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 519b6fa..5d58036 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -18,12 +18,16 @@
 package kafka.zk
 
 import javax.security.auth.login.Configuration
-import kafka.utils.{ZkUtils, Logging, CoreUtils}
+
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
 import org.junit.{After, Before}
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.test.IntegrationTest
+import org.junit.experimental.categories.Category
 
-trait ZooKeeperTestHarness extends JUnitSuite with Logging {
+@Category(Array(classOf[IntegrationTest]))
+abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   val zkConnectionTimeout = 10000
   val zkSessionTimeout = 6000

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/jenkins.sh
----------------------------------------------------------------------
diff --git a/jenkins.sh b/jenkins.sh
new file mode 100755
index 0000000..0eec2e5
--- /dev/null
+++ b/jenkins.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
+# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
+# the modules are executed before the integration tests.
+./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest unitTest integrationTest --no-daemon -Dorg.gradle.project.testLoggingEvents=started,passed,skipped,failed "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index d4bf471..eebbde9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -28,12 +28,14 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@Category({IntegrationTest.class})
 public class KafkaStreamsTest {
 
     private static final int NUM_BROKERS = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 421efc7..25dea92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -31,13 +31,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.IntegrationTest;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -67,7 +65,7 @@ import static org.junit.Assert.assertThat;
  * }
  * </pre>
  */
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
 public class FanoutIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
@@ -86,15 +84,6 @@ public class FanoutIntegrationTest {
         CLUSTER.createTopic(OUTPUT_TOPIC_C);
     }
 
-    @Parameter
-    public long cacheSizeBytes;
-
-    //Single parameter, use Object[]
-    @Parameters
-    public static Object[] data() {
-        return new Object[] {0, 10 * 1024 * 1024L};
-    }
-
     @Test
     public void shouldFanoutTheInput() throws Exception {
         final List<String> inputValues = Arrays.asList("Hello", "World");
@@ -116,7 +105,6 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
         final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
         final KStream<byte[], String> stream2 = stream1.mapValues(

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 4d9b365..676b464 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -35,12 +35,14 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -48,6 +50,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+@Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 3a9d843..4b558f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -38,12 +38,14 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.Map;
@@ -60,6 +62,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests related to internal topics in streams
  */
+@Category({IntegrationTest.class})
 public class InternalTopicIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 6f77716..5263456 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -43,6 +44,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,6 +60,7 @@ import static org.hamcrest.core.Is.is;
 /**
  * Tests all available joins of Kafka Streams DSL.
  */
+@Category({IntegrationTest.class})
 public class JoinIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 7b14cf8..415f593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -51,6 +52,7 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 import kafka.utils.MockTime;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -59,6 +61,7 @@ import static org.hamcrest.core.Is.is;
  * Similar to KStreamAggregationIntegrationTest but with dedupping enabled
  * by virtue of having a large commit interval
  */
+@Category({IntegrationTest.class})
 public class KStreamAggregationDedupIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4eb582c..f42ad56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -45,12 +45,14 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -69,6 +71,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
 
+@Category({IntegrationTest.class})
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index e60530d..d566041 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -37,15 +37,13 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -59,8 +57,7 @@ import static org.junit.Assert.assertThat;
  * End-to-end integration test that demonstrates how to perform a join between a KStream and a
  * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
  */
-
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
 public class KStreamKTableJoinIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
@@ -95,7 +92,7 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
             TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+
 
     }
 
@@ -107,16 +104,6 @@ public class KStreamKTableJoinIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-    @Parameter
-    public long cacheSizeBytes;
-
-    //Single parameter, use Object[]
-    @Parameters
-    public static Object[] data() {
-        return new Object[] {0, 10 * 1024 * 1024L};
-
-    }
-
     /**
      * Tuple for a region and its associated number of clicks.
      */
@@ -147,7 +134,17 @@ public class KStreamKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldCountClicksPerRegion() throws Exception {
+    public void shouldCountClicksPerRegionWithZeroByteCache() throws Exception {
+        countClicksPerRegion(0);
+    }
+
+    @Test
+    public void shouldCountClicksPerRegionWithNonZeroByteCache() throws Exception {
+        countClicksPerRegion(10 * 1024 * 1024);
+    }
+
+    private void countClicksPerRegion(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         // Input 1: Clicks per user (multiple records allowed per user).
         final List<KeyValue<String, Long>> userClicks = Arrays.asList(
             new KeyValue<>("alice", 13L),

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 5d44255..7da1ffd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -44,10 +45,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -60,7 +58,7 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
 public class KStreamRepartitionJoinTest {
 
     private static final int NUM_BROKERS = 1;
@@ -87,15 +85,6 @@ public class KStreamRepartitionJoinTest {
     private String streamFourInput;
     private static volatile int testNo = 0;
 
-    @Parameter
-    public long cacheSizeBytes;
-
-    //Single parameter, use Object[]
-    @Parameters
-    public static Object[] data() {
-        return new Object[] {0, 10 * 1024 * 1024L};
-    }
-
     @Before
     public void before() throws InterruptedException {
         testNo++;
@@ -109,7 +98,6 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
@@ -127,8 +115,17 @@ public class KStreamRepartitionJoinTest {
     }
 
     @Test
-    public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
+    public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() throws Exception {
+        verifyRepartitionOnJoinOperations(0);
+    }
 
+    @Test
+    public void shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws Exception {
+        verifyRepartitionOnJoinOperations(10 * 1024 * 1024);
+    }
+
+    private void verifyRepartitionOnJoinOperations(final int cacheSizeBytes) throws Exception {
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         produceMessages();
         final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
         final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index c12b975..3028b6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -37,6 +38,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,6 +51,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
+@Category({IntegrationTest.class})
 public class KStreamsFineGrainedAutoResetIntegrationTest {
 
     private static final int NUM_BROKERS = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index baeaf6f..ec40c17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -30,23 +30,24 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
 public class KTableKTableJoinIntegrationTest {
     private final static int NUM_BROKERS = 1;
 
@@ -61,71 +62,6 @@ public class KTableKTableJoinIntegrationTest {
     private KafkaStreams streams;
     private final static Properties CONSUMER_CONFIG = new Properties();
 
-    @Parameterized.Parameter(value = 0)
-    public JoinType joinType1;
-    @Parameterized.Parameter(value = 1)
-    public JoinType joinType2;
-    @Parameterized.Parameter(value = 2)
-    public List<KeyValue<String, String>> expectedResult;
-
-    //Single parameter, use Object[]
-    @Parameterized.Parameters
-    public static Object[] parameters() {
-        return new Object[][]{
-            {JoinType.INNER, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("b", "B1-B2-B3")//,
-            )},
-            {JoinType.INNER, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("b", "B1-B2-B3")//,
-            )},
-            {JoinType.INNER, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("b", "B1-B2-B3")//,
-            )},
-            {JoinType.LEFT, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")//,
-            )},
-            {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")//,
-            )},
-            {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")//,
-            )},
-            {JoinType.OUTER, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")
-            )},
-            {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")
-            )},
-            {JoinType.OUTER, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")
-            )}
-        };
-    }
-
     @BeforeClass
     public static void beforeTest() throws Exception {
         CLUSTER.createTopic(TABLE_1);
@@ -194,7 +130,101 @@ public class KTableKTableJoinIntegrationTest {
         INNER, LEFT, OUTER
     }
 
-    private KafkaStreams prepareTopology() {
+
+    @Test
+    public void shouldInnerInnerJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+    }
+
+    @Test
+    public void shouldInnerLeftJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+    }
+
+    @Test
+    public void shouldInnerOuterJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("b", "B1-B2-B3")));
+    }
+
+    @Test
+    public void shouldLeftInnerJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3")));
+    }
+
+    @Test
+    public void shouldLeftLeftJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3")));
+    }
+
+    @Test
+    public void shouldLeftOuterJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3")));
+    }
+
+    @Test
+    public void shouldOuterInnerJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")));
+    }
+
+    @Test
+    public void shouldOuterLeftJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")));
+    }
+
+    @Test
+    public void shouldOuterOuterJoin() throws Exception {
+        verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")));
+    }
+
+
+    private void verifyKTableKTableJoin(final JoinType joinType1,
+                                        final JoinType joinType2,
+                                        final List<KeyValue<String, String>> expectedResult) throws Exception {
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
+
+        streams = prepareTopology(joinType1, joinType2);
+        streams.start();
+
+
+        final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                CONSUMER_CONFIG,
+                OUTPUT,
+                expectedResult.size());
+
+        assertThat(result, equalTo(expectedResult));
+    }
+    private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2) {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
@@ -226,20 +256,4 @@ public class KTableKTableJoinIntegrationTest {
         throw new RuntimeException("Unknown join type.");
     }
 
-    @Test
-    public void KTableKTableJoin() throws Exception {
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
-
-        streams = prepareTopology();
-        streams.start();
-
-
-        final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            CONSUMER_CONFIG,
-            OUTPUT,
-            expectedResult.size());
-
-        assertThat(result, equalTo(expectedResult));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index a2b56d3..012462a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -55,10 +56,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -77,7 +75,7 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
 public class QueryableStateIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
@@ -85,7 +83,7 @@ public class QueryableStateIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
         new EmbeddedKafkaCluster(NUM_BROKERS);
-    public static final int STREAM_THREE_PARTITIONS = 4;
+    private static final int STREAM_THREE_PARTITIONS = 4;
     private final MockTime mockTime = CLUSTER.time;
     private String streamOne = "stream-one";
     private String streamTwo = "stream-two";
@@ -106,7 +104,7 @@ public class QueryableStateIntegrationTest {
     private Comparator<KeyValue<String, Long>> stringLongComparator;
     private static int testNo = 0;
 
-    public void createTopics() throws InterruptedException {
+    private void createTopics() throws InterruptedException {
         streamOne = streamOne + "-" + testNo;
         streamConcurrent = streamConcurrent + "-" + testNo;
         streamThree = streamThree + "-" + testNo;
@@ -123,15 +121,6 @@ public class QueryableStateIntegrationTest {
         CLUSTER.createTopic(outputTopicThree);
     }
 
-    @Parameter
-    public long cacheSizeBytes;
-
-    //Single parameter, use Object[]
-    @Parameters
-    public static Object[] data() {
-        return new Object[]{0, 10 * 1024 * 1024L};
-    }
-
     @Before
     public void before() throws IOException, InterruptedException {
         testNo++;
@@ -147,8 +136,7 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
 
         stringComparator = new Comparator<KeyValue<String, String>>() {
 
@@ -426,7 +414,17 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void shouldBeAbleToQueryState() throws Exception {
+    public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception {
+        verifyCanQueryState(0);
+    }
+
+    @Test
+    public void shouldBeAbleToQueryStateWithNonZeroSizedCache() throws Exception {
+        verifyCanQueryState(10 * 1024 * 1024);
+    }
+
+    private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
 
@@ -446,13 +444,13 @@ public class QueryableStateIntegrationTest {
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 streamOne,
-            batch1,
-            TestUtils.producerConfig(
+                batch1,
+                TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
                 StringSerializer.class,
                 StringSerializer.class,
                 new Properties()),
-            mockTime);
+                mockTime);
 
         final KStream<String, String> s1 = builder.stream(streamOne);
 
@@ -477,7 +475,6 @@ public class QueryableStateIntegrationTest {
             myCount);
 
         verifyRangeAndAll(expectedCount, myCount);
-
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e115861..a84a208 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -46,6 +47,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@ import static org.junit.Assert.fail;
  * End-to-end integration test based on using regex and named topics for creating sources, using
  * an embedded Kafka cluster.
  */
+@Category({IntegrationTest.class})
 public class RegexSourceIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 88a8545..ce29f32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
@@ -47,6 +48,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -60,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 /**
  * Tests local state store and global application cleanup.
  */
+@Category({IntegrationTest.class})
 public class ResetIntegrationTest {
     private static final int NUM_BROKERS = 1;
 


Mime
View raw message