kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: single Jackson serde for PageViewTypedDemo (#5590)
Date Fri, 31 Aug 2018 20:13:48 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d57fe1b  MINOR: single Jackson serde for PageViewTypedDemo (#5590)
d57fe1b is described below

commit d57fe1b053546966e6a867d84ee24dd256bb071a
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Aug 31 15:13:42 2018 -0500

    MINOR: single Jackson serde for PageViewTypedDemo (#5590)
    
    Previously, we depicted creating a Jackson serde for every pojo class, which becomes a
burden in practice. There are many ways to avoid this and just have a single serde, so we've
decided to model this design choice instead.
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/import-control.xml                      |   2 +-
 gradle/findbugs-exclude.xml                        |  17 ++
 .../examples/pageview/JsonPOJODeserializer.java    |  61 -------
 .../examples/pageview/JsonPOJOSerializer.java      |  55 -------
 .../examples/pageview/PageViewTypedDemo.java       | 175 +++++++++++++--------
 5 files changed, 128 insertions(+), 182 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 840e551..bd5c11f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -216,7 +216,7 @@
     <allow pkg="org.apache.kafka.streams"/>
 
     <subpackage name="examples">
-      <allow pkg="com.fasterxml.jackson.databind" />
+      <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.connect.json" />
     </subpackage>
 
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 0998185..62240d8 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -292,4 +292,21 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
         </Or>
         <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
     </Match>
+
+    <!-- Suppress warnings for unused members that are undetectably used by Jackson -->
+    <Match>
+        <Package name="org.apache.kafka.streams.examples.pageview"/>
+        <Bug pattern="NP_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+    <Match>
+        <Package name="org.apache.kafka.streams.examples.pageview"/>
+        <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+    <Match>
+        <Package name="org.apache.kafka.streams.examples.pageview"/>
+        <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+
+    <!-- END Suppress warnings for unused members that are undetectably used by Jackson
-->
+
 </FindBugsFilter>
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
deleted file mode 100644
index d55246c..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.util.Map;
-
-public class JsonPOJODeserializer<T> implements Deserializer<T> {
-    private ObjectMapper objectMapper = new ObjectMapper();
-
-    private Class<T> tClass;
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonPOJODeserializer() {
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(final Map<String, ?> props, final boolean isKey) {
-        tClass = (Class<T>) props.get("JsonPOJOClass");
-    }
-
-    @Override
-    public T deserialize(final String topic, final byte[] bytes) {
-        if (bytes == null)
-            return null;
-
-        final T data;
-        try {
-            data = objectMapper.readValue(bytes, tClass);
-        } catch (final Exception e) {
-            throw new SerializationException(e);
-        }
-
-        return data;
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
deleted file mode 100644
index 81ccf1e..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.examples.pageview;
-
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.util.Map;
-
-public class JsonPOJOSerializer<T> implements Serializer<T> {
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonPOJOSerializer() {
-    }
-    
-    @Override
-    public void configure(final Map<String, ?> props, final boolean isKey) {
-    }
-
-    @Override
-    public byte[] serialize(final String topic, final T data) {
-        if (data == null)
-            return null;
-
-        try {
-            return objectMapper.writeValueAsBytes(data);
-        } catch (final Exception e) {
-            throw new SerializationException("Error serializing JSON message", e);
-        }
-    }
-
-    @Override
-    public void close() {
-    }
-
-}
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 503dbeb..871d836 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -16,23 +16,26 @@
  */
 package org.apache.kafka.streams.examples.pageview;
 
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -48,35 +51,122 @@ import java.util.concurrent.TimeUnit;
  * is JSON string representing a record in the stream or table, to compute the number of
pageviews per user region.
  *
  * Before running this example you must create the input topics and the output topic (e.g.
via
- * bin/kafka-topics.sh --create ...), and write some data to the input topics (e.g. via
- * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output
topic.
+ * bin/kafka-topics --create ...), and write some data to the input topics (e.g. via
+ * bin/kafka-console-producer). Otherwise you won't see any data arriving in the output topic.
+ *
+ * The inputs for this example are:
+ * - Topic: streams-pageview-input
+ *   Key Format: (String) USER_ID
+ *   Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": (String PAGE_ID),
"timestamp": (long ms TIMESTAMP)}
+ *
+ * - Topic: streams-userprofile-input
+ *   Key Format: (String) USER_ID
+ *   Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": (long ms TIMESTAMP)}
+ *
+ * To observe the results, read the output topic (e.g., via bin/kafka-console-consumer)
+ * - Topic: streams-pageviewstats-typed-output
+ *   Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms WINDOW_TIMESTAMP), "region":
(String REGION)}
+ *   Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": (String REGION)}
+ *
+ * Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization
in the
+ * generic {@link JSONSerde}. If you instead specify a specific serde per class, you won't
need the extra "_t" field.
  */
+@SuppressWarnings({"WeakerAccess", "unused"})
 public class PageViewTypedDemo {
 
+    /**
+     * A serde for any class that implements {@link JSONSerdeCompatible}. Note that the classes
also need to
+     * be registered in the {@code @JsonSubTypes} annotation on {@link JSONSerdeCompatible}.
+     *
+     * @param <T> The concrete type of the class that gets de/serialized
+     */
+    public static class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
Deserializer<T>, Serde<T> {
+        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {}
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T deserialize(final String topic, final byte[] data) {
+            if (data == null) {
+                return null;
+            }
+
+            try {
+                return (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class);
+            } catch (final IOException e) {
+                throw new SerializationException(e);
+            }
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final T data) {
+            if (data == null) {
+                return null;
+            }
+
+            try {
+                return OBJECT_MAPPER.writeValueAsBytes(data);
+            } catch (final Exception e) {
+                throw new SerializationException("Error serializing JSON message", e);
+            }
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public Serializer<T> serializer() {
+            return this;
+        }
+
+        @Override
+        public Deserializer<T> deserializer() {
+            return this;
+        }
+    }
+
+    /**
+     * An interface for registering types that can be de/serialized with {@link JSONSerde}.
+     */
+    @SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property
= "_t")
+    @JsonSubTypes({
+                      @JsonSubTypes.Type(value = PageView.class, name = "pv"),
+                      @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
+                      @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
+                      @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
+                      @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
+                  })
+    public interface JSONSerdeCompatible {
+
+    }
+
     // POJO classes
-    static public class PageView {
+    static public class PageView implements JSONSerdeCompatible {
         public String user;
         public String page;
         public Long timestamp;
     }
 
-    static public class UserProfile {
+    static public class UserProfile implements JSONSerdeCompatible {
         public String region;
         public Long timestamp;
     }
 
-    static public class PageViewByRegion {
+    static public class PageViewByRegion implements JSONSerdeCompatible {
         public String user;
         public String page;
         public String region;
     }
 
-    static public class WindowedPageViewByRegion {
+    static public class WindowedPageViewByRegion implements JSONSerdeCompatible {
         public long windowStart;
         public String region;
     }
 
-    static public class RegionCount {
+    static public class RegionCount implements JSONSerdeCompatible {
         public long count;
         public String region;
     }
@@ -86,67 +176,21 @@ public class PageViewTypedDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
+        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, JSONSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
+        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JSONSerde.class);
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        // TODO: the following can be removed with a serialization factory
-        final Map<String, Object> serdeProps = new HashMap<>();
-
-        final Serializer<PageView> pageViewSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewSerializer.configure(serdeProps, false);
-
-        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewDeserializer.configure(serdeProps, false);
-
-        final Serde<PageView> pageViewSerde = Serdes.serdeFrom(pageViewSerializer,
pageViewDeserializer);
-
-        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileSerializer.configure(serdeProps, false);
-
-        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileDeserializer.configure(serdeProps, false);
-
-        final Serde<UserProfile> userProfileSerde = Serdes.serdeFrom(userProfileSerializer,
userProfileDeserializer);
-
-        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new
JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionSerializer.configure(serdeProps, false);
-
-        final Deserializer<WindowedPageViewByRegion> wPageViewByRegionDeserializer
= new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionDeserializer.configure(serdeProps, false);
-
-        final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde = Serdes.serdeFrom(wPageViewByRegionSerializer,
wPageViewByRegionDeserializer);
-
-        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountSerializer.configure(serdeProps, false);
-
-        final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountDeserializer.configure(serdeProps, false);
-        final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer,
regionCountDeserializer);
-
-        final Serializer<PageViewByRegion> pageViewByRegionSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
-        pageViewByRegionSerializer.configure(serdeProps, false);
-        final Deserializer<PageViewByRegion> pageViewByRegionDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
-        pageViewByRegionDeserializer.configure(serdeProps, false);
-        final Serde<PageViewByRegion> pageViewByRegionSerde = Serdes.serdeFrom(pageViewByRegionSerializer,
pageViewByRegionDeserializer);
-
-        final KStream<String, PageView> views = builder.stream("streams-pageview-input",
Consumed.with(Serdes.String(), pageViewSerde));
+        final KStream<String, PageView> views = builder.stream("streams-pageview-input",
Consumed.with(Serdes.String(), new JSONSerde<>()));
 
-        final KTable<String, UserProfile> users = builder.table("streams-userprofile-input",
-                                                                Consumed.with(Serdes.String(),
userProfileSerde));
+        final KTable<String, UserProfile> users = builder.table("streams-userprofile-input",
Consumed.with(Serdes.String(), new JSONSerde<>()));
 
         final KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
             .leftJoin(users, (view, profile) -> {
@@ -162,7 +206,7 @@ public class PageViewTypedDemo {
                 return viewByRegion;
             })
             .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
-            .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
+            .groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>()))
             .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
             .count()
             .toStream()
@@ -179,7 +223,7 @@ public class PageViewTypedDemo {
             });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde,
regionCountSerde));
+        regionCount.to("streams-pageviewstats-typed-output");
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
         final CountDownLatch latch = new CountDownLatch(1);
@@ -197,6 +241,7 @@ public class PageViewTypedDemo {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
+            e.printStackTrace();
             System.exit(1);
         }
         System.exit(0);


Mime
View raw message