kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Replacing for with foreach loop in stream test classes
Date Tue, 10 Jan 2017 19:53:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe82330f0 -> 75469a3b6


MINOR: Replacing for with foreach loop in stream test classes

Author: Prabhat Kashyap <prabhat.kashyap@knoldus.in>

Reviewers: Ismael Juma, Damian Guy, Guozhang Wang

Closes #2305 from PKOfficial/code-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75469a3b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75469a3b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75469a3b

Branch: refs/heads/trunk
Commit: 75469a3b602c26ea81d6fc0a409d39d321195ea4
Parents: fe82330
Author: Prabhat Kashyap <prabhat.kashyap@knoldus.in>
Authored: Tue Jan 10 11:53:03 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 10 11:53:03 2017 -0800

----------------------------------------------------------------------
 .../kstream/internals/KStreamBranchTest.java    |   4 +-
 .../kstream/internals/KStreamFilterTest.java    |   8 +-
 .../kstream/internals/KStreamFlatMapTest.java   |   4 +-
 .../internals/KStreamFlatMapValuesTest.java     |   4 +-
 .../internals/KStreamKStreamJoinTest.java       | 184 +++++++++----------
 .../internals/KStreamKStreamLeftJoinTest.java   |  48 ++---
 .../internals/KStreamKTableJoinTest.java        |  16 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  16 +-
 .../kstream/internals/KStreamMapTest.java       |   4 +-
 .../kstream/internals/KStreamMapValuesTest.java |   4 +-
 .../kstream/internals/KStreamTransformTest.java |   4 +-
 .../internals/KStreamTransformValuesTest.java   |   4 +-
 .../kstream/internals/KTableAggregateTest.java  |   9 +-
 .../kstream/internals/KTableKTableJoinTest.java |  32 ++--
 .../internals/KTableKTableLeftJoinTest.java     |  48 ++---
 .../internals/KTableKTableOuterJoinTest.java    |  48 ++---
 ...gedSortedCacheKeyValueStoreIteratorTest.java |  16 +-
 17 files changed, 227 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index fb19c0f..4e396ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -87,8 +87,8 @@ public class KStreamBranchTest {
         }
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
         }
 
         assertEquals(3, processors[0].processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 4be8513..962df07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -63,8 +63,8 @@ public class KStreamFilterTest {
         stream.filter(isMultipleOfThree).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
         }
 
         assertEquals(2, processor.processed.size());
@@ -83,8 +83,8 @@ public class KStreamFilterTest {
         stream.filterNot(isMultipleOfThree).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
         }
 
         assertEquals(5, processor.processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index da57d4b..724b08b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -71,8 +71,8 @@ public class KStreamFlatMapTest {
         stream.flatMap(mapper).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
         }
 
         assertEquals(6, processor.processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 9d1141b..ff4a581 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -70,8 +70,8 @@ public class KStreamFlatMapValuesTest {
         stream.flatMapValues(mapper).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
         }
 
         assertEquals(8, processor.processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index f03a201..7432f1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -120,8 +120,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -132,8 +132,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -144,8 +144,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -218,8 +218,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
@@ -230,8 +230,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -242,8 +242,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -329,36 +329,36 @@ public class KStreamKStreamJoinTest {
         time = 1000 + 100L;
         setRecordContext(time, topic2);
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
@@ -367,36 +367,36 @@ public class KStreamKStreamJoinTest {
 
         time = 1000L - 100L - 1L;
         setRecordContext(time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -415,36 +415,36 @@ public class KStreamKStreamJoinTest {
 
         time = 2000L + 100L;
         setRecordContext(time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
@@ -453,36 +453,36 @@ public class KStreamKStreamJoinTest {
 
         time = 2000L - 100L - 1L;
         setRecordContext(time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
@@ -525,36 +525,36 @@ public class KStreamKStreamJoinTest {
         time = 1000L - 1L;
         setRecordContext(time, topic2);
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -562,36 +562,36 @@ public class KStreamKStreamJoinTest {
         time = 1000 + 100L;
         setRecordContext(time, topic2);
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
@@ -634,72 +634,72 @@ public class KStreamKStreamJoinTest {
         time = 1000L - 100L - 1L;
 
         setRecordContext(time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
         time = 1000L;
 
         setRecordContext(time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
         setRecordContext(++time, topic2);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 58090fd..890d8f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -135,8 +135,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
         // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
@@ -147,8 +147,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
         // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -231,36 +231,36 @@ public class KStreamKStreamLeftJoinTest {
 
         time = 1000L + 100L;
         setRecordContext(time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
@@ -269,36 +269,36 @@ public class KStreamKStreamLeftJoinTest {
 
         time = 1000L - 100L - 1L;
         setRecordContext(time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
 
         setRecordContext(++time, topic1);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 742e852..b8142fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -104,23 +104,23 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
         // push all items to the other stream. this should not produce any item
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -135,8 +135,8 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index f2fad78..94e7888 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -104,23 +104,23 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
         // push all items to the other stream. this should not produce any item
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
 
         processor.checkAndClearProcessResult();
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -135,8 +135,8 @@ public class KStreamKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
 
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 00e5d70..d843f93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -68,8 +68,8 @@ public class KStreamMapTest {
         stream.map(mapper).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
         }
 
         assertEquals(4, processor.processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index e48b677..802b019 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -66,8 +66,8 @@ public class KStreamMapValuesTest {
         stream.mapValues(mapper).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i]));
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, Integer.toString(expectedKey));
         }
 
         assertEquals(4, processor.processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index e0bdfbc..cbb2764 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -88,8 +88,8 @@ public class KStreamTransformTest {
         stream.transform(transformerSupplier).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, expectedKey * 10);
         }
 
         driver.punctuate(2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index aebcc76..600a0b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -88,8 +88,8 @@ public class KStreamTransformValuesTest {
         stream.transformValues(valueTransformerSupplier).process(processor);
 
         driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, expectedKey * 10);
         }
 
         assertEquals(4, processor.processed.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 8378a79..d8ec89b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -156,13 +156,14 @@ public class KTableAggregateTest {
         KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
-                    if (key.equals("null")) {
+                switch (key) {
+                    case "null":
                         return KeyValue.pair(null, value);
-                    } else if (key.equals("NULL")) {
+                    case "NULL":
                         return null;
-                    } else {
+                    default:
                         return KeyValue.pair(value, value);
-                    }
+                }
                 }
             },
                 stringSerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 1bd8600..05cc785 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -158,8 +158,8 @@ public class KTableKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
 
@@ -212,24 +212,24 @@ public class KTableKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
 
         proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)");
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
@@ -244,8 +244,8 @@ public class KTableKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
@@ -296,15 +296,15 @@ public class KTableKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0)", "1:(XX1+Y1<-X1+Y1)");
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+YY0<-XX0+Y0)", "1:(XX1+YY1<-XX1+Y1)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
@@ -327,8 +327,8 @@ public class KTableKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 9549869..551fa1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -120,16 +120,16 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -137,8 +137,8 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -155,8 +155,8 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
@@ -206,23 +206,23 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
@@ -237,8 +237,8 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
@@ -289,23 +289,23 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
@@ -320,8 +320,8 @@ public class KTableKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 8d1c70a..8673240 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -117,16 +117,16 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -134,8 +134,8 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -152,8 +152,8 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
@@ -211,23 +211,23 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
@@ -242,8 +242,8 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
@@ -301,23 +301,23 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic2, expectedKey, "YY" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "X" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
@@ -332,8 +332,8 @@ public class KTableKTableOuterJoinTest {
 
         // push all four items to the primary stream. this should produce four items.
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topic1, expectedKey, "XX" + expectedKey);
         }
         driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");

http://git-wip-us.apache.org/repos/asf/kafka/blob/75469a3b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index db391ed..14abaa0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -98,9 +98,9 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Test
     public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
         final byte[][] bytes = {{0}, {1}, {2}};
-        for (int i = 0; i < bytes.length; i++) {
-            store.put(Bytes.wrap(bytes[i]), bytes[i]);
-            cache.put(namespace, bytes[i], new LRUCacheEntry(null));
+        for (byte[] aByte : bytes) {
+            store.put(Bytes.wrap(aByte), aByte);
+            cache.put(namespace, aByte, new LRUCacheEntry(null));
         }
         assertFalse(createIterator().hasNext());
     }
@@ -108,8 +108,8 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Test
     public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
         final byte[][] bytes = {{0}, {1}, {2}};
-        for (int i = 0; i < bytes.length; i++) {
-            cache.put(namespace, bytes[i], new LRUCacheEntry(null));
+        for (byte[] aByte : bytes) {
+            cache.put(namespace, aByte, new LRUCacheEntry(null));
         }
         assertFalse(createIterator().hasNext());
     }
@@ -117,9 +117,9 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Test
     public void shouldSkipAllDeletedFromCache() throws Exception {
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
-        for (int i = 0; i < bytes.length; i++) {
-            store.put(Bytes.wrap(bytes[i]), bytes[i]);
-            cache.put(namespace, bytes[i], new LRUCacheEntry(bytes[i]));
+        for (byte[] aByte : bytes) {
+            store.put(Bytes.wrap(aByte), aByte);
+            cache.put(namespace, aByte, new LRUCacheEntry(aByte));
         }
         cache.put(namespace, bytes[1], new LRUCacheEntry(null));
         cache.put(namespace, bytes[2], new LRUCacheEntry(null));


Mime
View raw message