kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3896: Fix KStream-KStream leftJoin in RepartitionIntegrationTest
Date Thu, 02 Feb 2017 18:33:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 35cd008e5 -> 7436d28a2


KAFKA-3896: Fix KStream-KStream leftJoin in RepartitionIntegrationTest

The issue of transiently having duplicates is due to the bad design of the left join itself:
in order to ignore the partial joined results such as `A:null`, it lets the producer to potentially
send twice to source stream one and rely on all the following conditions to be true in order
to pass the test:

1. `receiveMessages` happen to have fetched all the produced results and have committed offsets.
2. streams app happen to have completed sending all result data.
3. consumer used in `receiveMessages` will complete getting all messages in a single poll().

If any of the above is not true, the test fails.

Fixed this test to add a filter right after left join to filter out partial joined results.
Minor cleanup on integration test utils.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Ewen Cheslack-Postava

Closes #2485 from guozhangwang/K3896-duplicate-join-results


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7436d28a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7436d28a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7436d28a

Branch: refs/heads/trunk
Commit: 7436d28a243295ae7c8c3723e97ea43a10f2771c
Parents: 35cd008
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Feb 2 10:33:33 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Feb 2 10:33:33 2017 -0800

----------------------------------------------------------------------
 .../integration/KStreamRepartitionJoinTest.java | 30 +++++++-------------
 .../integration/utils/IntegrationTestUtils.java | 23 ++++++++-------
 2 files changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7436d28a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 43e5d87..f08bc72 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockValueJoiner;
@@ -143,7 +144,7 @@ public class KStreamRepartitionJoinTest {
         verifyCorrectOutput(flatMapJoin);
         verifyCorrectOutput(mapRhs);
         verifyCorrectOutput(mapJoinJoin);
-        verifyLeftJoin(leftJoin);
+        verifyCorrectOutput(leftJoin);
     }
 
     private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
@@ -232,6 +233,13 @@ public class KStreamRepartitionJoinTest {
             Serdes.Integer(),
             Serdes.Integer(),
             Serdes.String())
+            .filterNot(new Predicate<Integer, String>() {
+                @Override
+                public boolean test(Integer key, String value) {
+                    // filter not left-only join results
+                    return value.substring(2).equals("null");
+                }
+            })
             .to(Serdes.Integer(), Serdes.String(), outputTopic);
 
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
@@ -268,7 +276,7 @@ public class KStreamRepartitionJoinTest {
     }
 
     private JoinWindows getJoinWindow() {
-        return (JoinWindows) JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
+        return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
     }
 
 
@@ -282,7 +290,6 @@ public class KStreamRepartitionJoinTest {
         }
     }
 
-
     private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic)
         throws InterruptedException {
         assertThat(receiveMessages(new StringDeserializer(),
@@ -291,16 +298,6 @@ public class KStreamRepartitionJoinTest {
             is(expectedOutputOnTopic.expectedOutput));
     }
 
-    private void verifyLeftJoin(final ExpectedOutputOnTopic expectedOutputOnTopic)
-        throws InterruptedException, ExecutionException {
-        final List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic
-            .expectedOutput.size(), expectedOutputOnTopic.outputTopic);
-        if (!received.equals(expectedOutputOnTopic.expectedOutput)) {
-            produceToStreamOne();
-            verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic);
-        }
-    }
-
     private void produceMessages()
         throws ExecutionException, InterruptedException {
         produceToStreamOne();
@@ -380,13 +377,8 @@ public class KStreamRepartitionJoinTest {
             numMessages,
             60 * 1000);
         Collections.sort(received);
-        return received;
-    }
 
-    private void verifyCorrectOutput(final List<String> expectedMessages,
-                                     final String topic) throws InterruptedException {
-        assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic),
-            is(expectedMessages));
+        return received;
     }
 
     private void doJoin(final KStream<Integer, Integer> lhs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/7436d28a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 08e22cc..a38781b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -61,12 +61,13 @@ public class IntegrationTestUtils {
      *
      * @param topic          Kafka topic to read messages from
      * @param consumerConfig Kafka consumer configuration
+     * @param waitTime       Maximum wait time in milliseconds
      * @param maxMessages    Maximum number of messages to read via the consumer.
      * @return The values retrieved via the consumer.
      */
-    public static <V> List<V> readValues(final String topic, final Properties
consumerConfig, final int maxMessages) {
+    public static <V> List<V> readValues(final String topic, final Properties
consumerConfig, final long waitTime, final int maxMessages) {
         final List<V> returnList = new ArrayList<>();
-        final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig,
maxMessages);
+        final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig,
waitTime, maxMessages);
         for (final KeyValue<?, V> kv : kvs) {
             returnList.add(kv.value);
         }
@@ -79,10 +80,11 @@ public class IntegrationTestUtils {
      *
      * @param topic          Kafka topic to read messages from
      * @param consumerConfig Kafka consumer configuration
+     * @param waitTime       Maximum wait time in milliseconds
      * @return The KeyValue elements retrieved via the consumer.
      */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String
topic, final Properties consumerConfig) {
-        return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String
topic, final Properties consumerConfig, final long waitTime) {
+        return readKeyValues(topic, consumerConfig, waitTime, UNLIMITED_MESSAGES);
     }
 
     /**
@@ -91,17 +93,17 @@ public class IntegrationTestUtils {
      *
      * @param topic          Kafka topic to read messages from
      * @param consumerConfig Kafka consumer configuration
+     * @param waitTime       Maximum wait time in milliseconds
      * @param maxMessages    Maximum number of messages to read via the consumer
      * @return The KeyValue elements retrieved via the consumer
      */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String
topic, final Properties consumerConfig, final int maxMessages) {
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String
topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
         final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
         consumer.subscribe(Collections.singletonList(topic));
         final int pollIntervalMs = 100;
-        final int maxTotalPollTimeMs = 2000;
-        int totalPollTimeMs = 0;
         final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
-        while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(),
maxMessages)) {
+        int totalPollTimeMs = 0;
+        while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(),
maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
             final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
             for (final ConsumerRecord<K, V> record : records) {
@@ -208,7 +210,7 @@ public class IntegrationTestUtils {
         final TestCondition valuesRead = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
+                final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig,
waitTime);
                 accumData.addAll(readData);
                 return accumData.size() >= expectedNumRecords;
             }
@@ -248,8 +250,9 @@ public class IntegrationTestUtils {
         final TestCondition valuesRead = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                final List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
+                final List<V> readData = readValues(topic, consumerConfig, waitTime,
expectedNumRecords);
                 accumData.addAll(readData);
+
                 return accumData.size() >= expectedNumRecords;
             }
         };


Mime
View raw message