kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: Modified Exception handling for KIP-470 (#7461)
Date Tue, 08 Oct 2019 02:34:03 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 7f33072  MINOR: Modified Exception handling for KIP-470 (#7461)
7f33072 is described below

commit 7f330725de8ad26f6e17d579f922a09aa003772a
Author: Jukka Karvanen <48978068+jukkakarvanen@users.noreply.github.com>
AuthorDate: Tue Oct 8 05:32:40 2019 +0300

    MINOR: Modified Exception handling for KIP-470 (#7461)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/TestOutputTopic.java  |  2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +++-
 .../org/apache/kafka/streams/TestTopicsTest.java   |  9 +++++++++
 .../kafka/streams/TopologyTestDriverTest.java      | 22 +++++++++++++++-------
 4 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TestOutputTopic.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TestOutputTopic.java
index c3f690a..da3629e 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TestOutputTopic.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TestOutputTopic.java
@@ -131,7 +131,7 @@ public class TestOutputTopic<K, V> {
         while (!isEmpty()) {
             outputRow = readRecord();
             if (outputRow.key() == null) {
-                throw new NullPointerException("Null keys not allowed");
+                throw new IllegalStateException("Null keys not allowed with readKeyValuesToMap
method");
             }
             output.put(outputRow.key(), outputRow.value());
         }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 9a6de24..43324ee 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -728,11 +728,13 @@ public class TopologyTestDriver implements Closeable {
                            final Instant time) {
         final byte[] serializedKey = keySerializer.serialize(topic, record.headers(), record.key());
         final byte[] serializedValue = valueSerializer.serialize(topic, record.headers(),
record.value());
-        long timestamp = 0;
+        final long timestamp;
         if (time != null) {
             timestamp = time.toEpochMilli();
         } else if (record.timestamp() != null) {
             timestamp = record.timestamp();
+        } else {
+            throw new IllegalStateException("Provided `TestRecord` does not have a timestamp
and no timestamp overwrite was provided via `time` parameter.");
         }
 
         pipeRecord(topic, timestamp, serializedKey, serializedValue, record.headers());
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index d8c7847..3819688 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -165,6 +165,15 @@ public class TestTopicsTest {
     }
 
     @Test
+    public void testKeyValuesToMapWithNull() {
+        final TestInputTopic<Long, String> inputTopic = testDriver.createInputTopic(INPUT_TOPIC,
longSerde.serializer(), stringSerde.serializer());
+        final TestOutputTopic<Long, String> outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC,
longSerde.deserializer(), stringSerde.deserializer());
+        inputTopic.pipeInput("value");
+        assertThrows(IllegalStateException.class, () -> outputTopic.readKeyValuesToMap());
+    }
+
+
+    @Test
     public void testKeyValueListDuration() {
         final TestInputTopic<Long, String> inputTopic = testDriver.createInputTopic(INPUT_TOPIC_MAP,
longSerde.serializer(), stringSerde.serializer());
         final TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_MAP,
stringSerde.deserializer(), longSerde.deserializer());
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 14cfbc6..7e95392 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -401,7 +401,15 @@ public class TopologyTestDriverTest {
 
         testDriver = new TopologyTestDriver(new Topology(), config);
         assertThrows(IllegalArgumentException.class, () -> {
-            pipeRecord(unknownTopic, new TestRecord<byte[], byte[]>(nullValue));
+            testDriver.pipeRecord(unknownTopic, new TestRecord<byte[], byte[]>(nullValue),
new ByteArraySerializer(), new ByteArraySerializer(), Instant.now());
+        });
+    }
+
+    @Test
+    public void shouldThrowForMissingTime() {
+        testDriver = new TopologyTestDriver(new Topology(), config);
+        assertThrows(IllegalStateException.class, () -> {
+            testDriver.pipeRecord(SINK_TOPIC_1, new TestRecord<String, String>("value"),
new StringSerializer(), new StringSerializer(), null);
         });
     }
 
@@ -584,7 +592,7 @@ public class TopologyTestDriverTest {
                 consumerRecord1,
                 Serdes.Long().serializer(),
                 Serdes.String().serializer(),
-                null);
+                Instant.now());
         final TestRecord<Long, String> result1 =
             testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
         assertThat(result1.getKey(), equalTo(source1Key));
@@ -594,7 +602,7 @@ public class TopologyTestDriverTest {
                 consumerRecord2,
                 Serdes.Integer().serializer(),
                 Serdes.Double().serializer(),
-                null);
+                Instant.now());
         final TestRecord<Integer, Double> result2 =
             testDriver.readRecord(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
         assertThat(result2.getKey(), equalTo(source2Key));
@@ -627,7 +635,7 @@ public class TopologyTestDriverTest {
                 consumerRecord1,
                 Serdes.Long().serializer(),
                 Serdes.String().serializer(),
-                null);
+                Instant.now());
         final TestRecord<Long, String> result1 =
                 testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
         assertThat(result1.getKey(), equalTo(source1Key));
@@ -637,7 +645,7 @@ public class TopologyTestDriverTest {
                 consumerRecord2,
                 Serdes.Integer().serializer(),
                 Serdes.Double().serializer(),
-                null);
+                Instant.now());
         final TestRecord<Integer, Double> result2 =
                 testDriver.readRecord(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
         assertThat(result2.getKey(), equalTo(source2Key));
@@ -1345,7 +1353,7 @@ public class TopologyTestDriverTest {
         try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config))
{
             assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
             testDriver.pipeRecord("input-topic", new TestRecord<String, Long>("a",
1L),
-                    new StringSerializer(), new LongSerializer(), null);
+                    new StringSerializer(), new LongSerializer(), Instant.now());
             Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
         }
 
@@ -1369,7 +1377,7 @@ public class TopologyTestDriverTest {
             final KeyValueStore<String, String> globalStore = testDriver.getKeyValueStore("globalStore");
             Assert.assertNotNull(globalStore);
             Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
-            testDriver.pipeRecord("topic", new TestRecord<String, String>("k1", "value1"),
new StringSerializer(), new StringSerializer(), null);
+            testDriver.pipeRecord("topic", new TestRecord<String, String>("k1", "value1"),
new StringSerializer(), new StringSerializer(), Instant.now());
             // we expect to have both in the global store, the one from pipeInput and the
one from the producer
             Assert.assertEquals("value1", globalStore.get("k1"));
         }


Mime
View raw message