kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix KStreamKTableJoinTest and StreamTaskTest (#9357)
Date Thu, 01 Oct 2020 03:08:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95986a8  MINOR: Fix KStreamKTableJoinTest and StreamTaskTest (#9357)
95986a8 is described below

commit 95986a8f48da47645659556e2b03149e138d1d4e
Author: leah <lthomas@confluent.io>
AuthorDate: Wed Sep 30 22:07:23 2020 -0500

    MINOR: Fix KStreamKTableJoinTest and StreamTaskTest (#9357)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/suppressions.xml                        |  3 ++
 .../kstream/internals/KStreamKTableJoinTest.java   |  4 +--
 .../processor/internals/StreamTaskTest.java        | 32 +++++++++++-----------
 3 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dead182..454711c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -208,6 +208,9 @@
     <suppress checks="MethodLength"
               files="KStreamSlidingWindowAggregateTest.java"/>
 
+    <suppress checks="ClassFanOutComplexity"
+              files="StreamTaskTest.java"/>
+
     <!-- Streams test-utils -->
     <suppress checks="ClassFanOutComplexity"
               files="TopologyTestDriver.java"/>
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index c506779..80b776e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -255,7 +255,7 @@ public class KStreamKTableJoinTest {
             }
             assertThat(
                 appender.getMessages(),
-                hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic]
partition=[0] "
+                hasItem("Skipping record due to null join key or value. key=[null] value=[A]
topic=[streamTopic] partition=[0] "
                     + "offset=[0]"));
         }
     }
@@ -282,7 +282,7 @@ public class KStreamKTableJoinTest {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic]
partition=[0] "
+                hasItem("Skipping record due to null join key or value. key=[1] value=[null]
topic=[streamTopic] partition=[0] "
                     + "offset=[0]")
             );
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 229df65..30e20b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2050,22 +2050,22 @@ public class StreamTaskTest {
         final ProcessorTopology topology = withSources(asList(), mkMap());
 
         final TopologyException  exception = assertThrows(
-                TopologyException.class,
-                () -> new StreamTask(
-                        taskId,
-                        partitions,
-                        topology,
-                        consumer,
-                        createConfig(false, "100"),
-                        metrics,
-                        stateDirectory,
-                        cache,
-                        time,
-                        stateManager,
-                        recordCollector,
-                        context
-                )
-            );
+            TopologyException.class,
+            () -> new StreamTask(
+                taskId,
+                partitions,
+                topology,
+                consumer,
+                createConfig(false, "100"),
+                metrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context
+            )
+        );
 
         assertThat(exception.getMessage(), equalTo("Invalid topology: " +
                 "Topic is unkown to the topology. This may happen if different KafkaStreams
instances of the same " +


Mime
View raw message