kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5817: [FOLLOW-UP] add SerializedInternal
Date Mon, 11 Sep 2017 20:07:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9bb06c3ee -> d6fabb011


KAFKA-5817: [FOLLOW-UP] add SerializedInternal

Add `SerializedInternal` class and remove getters from `Serialized`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3825 from dguy/kafka-5817-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6fabb01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6fabb01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6fabb01

Branch: refs/heads/trunk
Commit: d6fabb011af848ff96691b5deb97c88a84e1a683
Parents: 9bb06c3
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Sep 11 13:06:55 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Sep 11 13:06:55 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/Serialized.java       | 12 +++----
 .../streams/kstream/internals/KStreamImpl.java  | 10 +++---
 .../streams/kstream/internals/KTableImpl.java   |  8 +++--
 .../kstream/internals/SerializedInternal.java   | 35 ++++++++++++++++++++
 4 files changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d6fabb01/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
index 18731a5..d69aabd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.serialization.Serde;
  */
 public class Serialized<K, V> {
 
-    private Serde<K> keySerde;
-    private Serde<V> valueSerde;
+    protected Serde<K> keySerde;
+    protected Serde<V> valueSerde;
 
     private Serialized(final Serde<K> keySerde,
                        final Serde<V> valueSerde) {
@@ -36,12 +36,8 @@ public class Serialized<K, V> {
         this.valueSerde = valueSerde;
     }
 
-    public Serde<K> keySerde() {
-        return keySerde;
-    }
-
-    public Serde<V> valueSerde() {
-        return valueSerde;
+    protected Serialized(final Serialized<K, V> serialized) {
+        this(serialized.keySerde, serialized.valueSerde);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6fabb01/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 41da536..7201a00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -785,12 +785,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
                                               final Serialized<KR, V> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
+        final SerializedInternal<KR, V> serializedInternal = new SerializedInternal<>(serialized);
         String selectName = internalSelectKey(selector);
         return new KGroupedStreamImpl<>(builder,
                                         selectName,
                                         sourceNodes,
-                                        serialized.keySerde(),
-                                        serialized.valueSerde(),
+                                        serializedInternal.keySerde(),
+                                        serializedInternal.valueSerde(),
                                         true);
     }
 
@@ -809,11 +810,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
 
     @Override
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized)
{
+        final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
         return new KGroupedStreamImpl<>(builder,
                                         this.name,
                                         sourceNodes,
-                                        serialized.keySerde(),
-                                        serialized.valueSerde(),
+                                        serializedInternal.keySerde(),
+                                        serializedInternal.valueSerde(),
                                         this.repartitionRequired);
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6fabb01/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index d3d6ce2..ed7abdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -691,8 +691,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         // select the aggregate key and values (old and new), it would require parent to
send old values
         builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name);
         this.enableSendingOldValues();
-
-        return new KGroupedTableImpl<>(builder, selectName, this.name, serialized.keySerde(),
serialized.valueSerde());
+        final SerializedInternal<K1, V1> serializedInternal  = new SerializedInternal<>(serialized);
+        return new KGroupedTableImpl<>(builder,
+                                       selectName,
+                                       this.name,
+                                       serializedInternal.keySerde(),
+                                       serializedInternal.valueSerde());
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6fabb01/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
new file mode 100644
index 0000000..fb802ea
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Serialized;
+
+class SerializedInternal<K, V> extends Serialized<K, V> {
+    SerializedInternal(final Serialized<K, V> serialized) {
+        super(serialized);
+    }
+
+    Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+}


Mime
View raw message