kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: add unit test for StateStoreSerdes
Date Fri, 13 Oct 2017 05:52:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 34188b4cc -> 9806c1b17


MINOR: add unit test for StateStoreSerdes

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4064 from mjsax/minor-add-state-serdes-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9806c1b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9806c1b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9806c1b1

Branch: refs/heads/trunk
Commit: 9806c1b176a8a07016ad18e0a6843cc486e52cc6
Parents: 34188b4
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Oct 12 22:52:54 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 12 22:52:54 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/serialization/Serdes.java      |  3 +-
 .../apache/kafka/streams/state/StateSerdes.java | 14 ++-
 .../kafka/streams/state/StateSerdesTest.java    | 89 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9806c1b1/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 4772ea5..d6b4d2d 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -151,7 +151,8 @@ public class Serdes {
         }
 
         // TODO: we can also serializes objects of type T using generic Java serialization
by default
-        throw new IllegalArgumentException("Unknown class for built-in serializer");
+        throw new IllegalArgumentException("Unknown class for built-in serializer. Supported
types are: " +
+            "String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9806c1b1/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 2a54cb5..f1de82f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.Objects;
+
 /**
  * Factory for creating serializers / deserializers for state stores in Kafka Streams.
  *
@@ -64,15 +66,9 @@ public final class StateSerdes<K, V> {
     public StateSerdes(final String topic,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde) {
-        if (topic == null) {
-            throw new IllegalArgumentException("topic cannot be null");
-        }
-        if (keySerde == null) {
-            throw new IllegalArgumentException("key serde cannot be null");
-        }
-        if (valueSerde == null) {
-            throw new IllegalArgumentException("value serde cannot be null");
-        }
+        Objects.requireNonNull(topic, "topic cannot be null");
+        Objects.requireNonNull(keySerde, "key serde cannot be null");
+        Objects.requireNonNull(valueSerde, "value serde cannot be null");
 
         this.topic = topic;
         this.keySerde = keySerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9806c1b1/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
new file mode 100644
index 0000000..6f29888
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StateSerdesTest {
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfTopicNameIsNullForBuiltinTypes() {
+        StateSerdes.withBuiltinTypes(null, byte[].class, byte[].class);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfKeyClassIsNullForBuiltinTypes() {
+        StateSerdes.withBuiltinTypes("anyName", null, byte[].class);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfValueClassIsNullForBuiltinTypes() {
+        StateSerdes.withBuiltinTypes("anyName", byte[].class, null);
+    }
+
+    @Test
+    public void shouldReturnSerdesForBuiltInKeyAndValueTypesForBuiltinTypes() {
+        final Class[] supportedBuildInTypes = new Class[] {
+            String.class,
+            Short.class,
+            Integer.class,
+            Long.class,
+            Float.class,
+            Double.class,
+            byte[].class,
+            ByteBuffer.class,
+            Bytes.class
+        };
+
+        for (final Class keyClass : supportedBuildInTypes) {
+            for (final Class valueClass : supportedBuildInTypes) {
+                Assert.assertNotNull(StateSerdes.withBuiltinTypes("anyName", keyClass, valueClass));
+            }
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownKeyTypeForBuiltinTypes() {
+        StateSerdes.withBuiltinTypes("anyName", Class.class, byte[].class);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownValueTypeForBuiltinTypes() {
+        StateSerdes.withBuiltinTypes("anyName", byte[].class, Class.class);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfTopicNameIsNull() {
+        new StateSerdes<>(null, Serdes.ByteArray(), Serdes.ByteArray());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfKeyClassIsNull() {
+        new StateSerdes<>("anyName", null, Serdes.ByteArray());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfValueClassIsNull() {
+        new StateSerdes<>("anyName", Serdes.ByteArray(), null);
+    }
+
+}


Mime
View raw message