kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve error message for incorrect window-serde initialization (#7067)
Date Wed, 17 Jul 2019 22:40:21 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f899489   MINOR: improve error message for incorrect window-serde initialization
(#7067)
f899489 is described below

commit f8994897d30ef7508756dceb1d19171d6204f2a2
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Wed Jul 17 15:39:55 2019 -0700

     MINOR: improve error message for incorrect window-serde initialization (#7067)
    
    Reviewers: Tim Berglund <tim@confluent.io>, Guozhang Wang <wangguoz@gmail.com>,
Bill Bejeck <bbejeck@gmail.com>
---
 .../kstream/SessionWindowedDeserializer.java       |  6 +-
 .../streams/kstream/SessionWindowedSerializer.java |  9 +-
 .../streams/kstream/TimeWindowedDeserializer.java  |  9 +-
 .../streams/kstream/TimeWindowedSerializer.java    |  8 +-
 .../kafka/streams/kstream/WindowedSerdes.java      | 20 +++++
 .../kafka/streams/kstream/WindowedSerdesTest.java  | 96 ++++++++++++++++++++++
 6 files changed, 142 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
index e2e0400..5638ec3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
@@ -59,6 +59,8 @@ public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>>
 
     @Override
     public Windowed<T> deserialize(final String topic, final byte[] data) {
+        WindowedSerdes.verifyInnerDeserializerNotNull(inner, this);
+
         if (data == null || data.length == 0) {
             return null;
         }
@@ -69,7 +71,9 @@ public class SessionWindowedDeserializer<T> implements Deserializer<Windowed<T>>
 
     @Override
     public void close() {
-        inner.close();
+        if (inner != null) {
+            inner.close();
+        }
     }
 
     // Only for testing
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 430fd63..00491c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -60,21 +60,26 @@ public class SessionWindowedSerializer<T> implements WindowedSerializer<T>
{
 
     @Override
     public byte[] serialize(final String topic, final Windowed<T> data) {
+        WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
+
         if (data == null) {
             return null;
         }
-
         // for either key or value, their schema is the same hence we will just use session
key schema
         return SessionKeySchema.toBinary(data, inner, topic);
     }
 
     @Override
     public void close() {
-        inner.close();
+        if (inner != null) {
+            inner.close();
+        }
     }
 
     @Override
     public byte[] serializeBaseKey(final String topic, final Windowed<T> data) {
+        WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
+
         return inner.serialize(topic, data.key());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
index 4adae4a..c852549 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -75,13 +75,16 @@ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
 
     @Override
     public Windowed<T> deserialize(final String topic, final byte[] data) {
+        WindowedSerdes.verifyInnerDeserializerNotNull(inner, this);
+
         if (data == null || data.length == 0) {
             return null;
         }
 
         // toStoreKeyBinary was used to serialize the data.
-        if (this.isChangelogTopic)
+        if (this.isChangelogTopic) {
             return WindowKeySchema.fromStoreKey(data, windowSize, inner, topic);
+        }
 
         // toBinary was used to serialize the data
         return WindowKeySchema.from(data, windowSize, inner, topic);
@@ -89,7 +92,9 @@ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
 
     @Override
     public void close() {
-        inner.close();
+        if (inner != null) {
+            inner.close();
+        }
     }
 
     public void setIsChangelogTopic(final boolean isChangelogTopic) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 72cdcb1..ad3377f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -61,6 +61,8 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T>
{
 
     @Override
     public byte[] serialize(final String topic, final Windowed<T> data) {
+        WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
+
         if (data == null) {
             return null;
         }
@@ -70,11 +72,15 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T>
{
 
     @Override
     public void close() {
-        inner.close();
+        if (inner != null) {
+            inner.close();
+        }
     }
 
     @Override
     public byte[] serializeBaseKey(final String topic, final Windowed<T> data) {
+        WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
+
         return inner.serialize(topic, data.key());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index 44489fd..07beb5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 
 public class WindowedSerdes {
 
@@ -76,4 +78,22 @@ public class WindowedSerdes {
     static public <T> Serde<Windowed<T>> sessionWindowedSerdeFrom(final
Class<T> type) {
         return new SessionWindowedSerde<>(Serdes.serdeFrom(type));
     }
+
+    static void verifyInnerSerializerNotNull(final Serializer inner,
+                                             final Serializer wrapper) {
+        if (inner == null) {
+            throw new NullPointerException("Inner serializer is `null`. " +
+                "User code must use constructor `" + wrapper.getClass().getSimpleName() +
"(final Serializer<T> inner)` " +
+                "instead of the no-arg constructor.");
+        }
+    }
+
+    static void verifyInnerDeserializerNotNull(final Deserializer inner,
+                                               final Deserializer wrapper) {
+        if (inner == null) {
+            throw new NullPointerException("Inner deserializer is `null`. " +
+                "User code must use constructor `" + wrapper.getClass().getSimpleName() +
"(final Deserializer<T> inner)` " +
+                "instead of the no-arg constructor.");
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
index ff180d1..8bc9291 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class WindowedSerdesTest {
@@ -65,4 +68,97 @@ public class WindowedSerdesTest {
         final Windowed<Integer> windowed = sessionWindowedSerde.deserializer().deserialize(topic,
bytes);
         Assert.assertEquals(sessionWindowed, windowed);
     }
+
+    @Test
+    public void timeWindowedSerializerShouldThrowNpeIfNotInitializedProperly() {
+        final TimeWindowedSerializer<byte[]> serializer = new TimeWindowedSerializer<>();
+        final NullPointerException exception = assertThrows(
+            NullPointerException.class,
+            () -> serializer.serialize("topic", new Windowed<>(new byte[0], new
TimeWindow(0, 1))));
+        assertThat(
+            exception.getMessage(),
+            equalTo("Inner serializer is `null`. User code must use constructor " +
+                "`TimeWindowedSerializer(final Serializer<T> inner)` instead of the
no-arg constructor."));
+    }
+
+    @Test
+    public void timeWindowedSerializerShouldThrowNpeOnSerializingBaseKeyIfNotInitializedProperly()
{
+        final TimeWindowedSerializer<byte[]> serializer = new TimeWindowedSerializer<>();
+        final NullPointerException exception = assertThrows(
+            NullPointerException.class,
+            () -> serializer.serializeBaseKey("topic", new Windowed<>(new byte[0],
new TimeWindow(0, 1))));
+        assertThat(
+            exception.getMessage(),
+            equalTo("Inner serializer is `null`. User code must use constructor " +
+                "`TimeWindowedSerializer(final Serializer<T> inner)` instead of the
no-arg constructor."));
+    }
+
+    @Test
+    public void timeWindowedDeserializerShouldThrowNpeIfNotInitializedProperly() {
+        final TimeWindowedDeserializer<byte[]> deserializer = new TimeWindowedDeserializer<>();
+        final NullPointerException exception = assertThrows(
+            NullPointerException.class,
+            () -> deserializer.deserialize("topic", new byte[0]));
+        assertThat(
+            exception.getMessage(),
+            equalTo("Inner deserializer is `null`. User code must use constructor " +
+                "`TimeWindowedDeserializer(final Deserializer<T> inner)` instead of
the no-arg constructor."));
+    }
+
+    @Test
+    public void sessionWindowedSerializerShouldThrowNpeIfNotInitializedProperly() {
+        final SessionWindowedSerializer<byte[]> serializer = new SessionWindowedSerializer<>();
+        final NullPointerException exception = assertThrows(
+            NullPointerException.class,
+            () -> serializer.serialize("topic", new Windowed<>(new byte[0], new
SessionWindow(0, 0))));
+        assertThat(
+            exception.getMessage(),
+            equalTo("Inner serializer is `null`. User code must use constructor " +
+                "`SessionWindowedSerializer(final Serializer<T> inner)` instead of
the no-arg constructor."));
+    }
+
+    @Test
+    public void sessionWindowedSerializerShouldThrowNpeOnSerializingBaseKeyIfNotInitializedProperly()
{
+        final SessionWindowedSerializer<byte[]> serializer = new SessionWindowedSerializer<>();
+        final NullPointerException exception = assertThrows(
+            NullPointerException.class,
+            () -> serializer.serializeBaseKey("topic", new Windowed<>(new byte[0],
new SessionWindow(0, 0))));
+        assertThat(
+            exception.getMessage(),
+            equalTo("Inner serializer is `null`. User code must use constructor " +
+                "`SessionWindowedSerializer(final Serializer<T> inner)` instead of
the no-arg constructor."));
+    }
+
+    @Test
+    public void sessionWindowedDeserializerShouldThrowNpeIfNotInitializedProperly() {
+        final SessionWindowedDeserializer<byte[]> deserializer = new SessionWindowedDeserializer<>();
+        final NullPointerException exception = assertThrows(
+            NullPointerException.class,
+            () -> deserializer.deserialize("topic", new byte[0]));
+        assertThat(
+            exception.getMessage(),
+            equalTo("Inner deserializer is `null`. User code must use constructor " +
+                "`SessionWindowedDeserializer(final Deserializer<T> inner)` instead
of the no-arg constructor."));
+    }
+
+    @Test
+    public void timeWindowedSerializerShouldNotThrowOnCloseIfNotInitializedProperly() {
+        new TimeWindowedSerializer<>().close();
+    }
+
+    @Test
+    public void timeWindowedDeserializerShouldNotThrowOnCloseIfNotInitializedProperly() {
+        new TimeWindowedDeserializer<>().close();
+    }
+
+    @Test
+    public void sessionWindowedSerializerShouldNotThrowOnCloseIfNotInitializedProperly()
{
+        new SessionWindowedSerializer<>().close();
+    }
+
+    @Test
+    public void sessionWindowedDeserializerShouldNotThrowOnCloseIfNotInitializedProperly()
{
+        new SessionWindowedDeserializer<>().close();
+    }
+
 }


Mime
View raw message