kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: always set Serde.Long on count operations
Date Fri, 29 Sep 2017 10:08:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b79b17971 -> 082def05c


MINOR: always set Serde.Long on count operations

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>

Closes #3943 from dguy/count-materialized


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/082def05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/082def05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/082def05

Branch: refs/heads/trunk
Commit: 082def05ca5af4f30e05aa28ba83fa299f30337b
Parents: b79b179
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 29 11:06:34 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 29 11:06:34 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/kstream/KGroupedStream.java  | 2 ++
 .../apache/kafka/streams/kstream/SessionWindowedKStream.java   | 4 +++-
 .../org/apache/kafka/streams/kstream/TimeWindowedKStream.java  | 4 +++-
 .../kafka/streams/kstream/internals/KGroupedStreamImpl.java    | 6 ++++++
 .../streams/kstream/internals/SessionWindowedKStreamImpl.java  | 5 +++++
 .../streams/kstream/internals/TimeWindowedKStreamImpl.java     | 5 +++++
 .../streams/kstream/internals/KGroupedStreamImplTest.java      | 3 +--
 .../kstream/internals/SessionWindowedKStreamImplTest.java      | 3 +--
 8 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 1ff1759..1c72ebf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -177,6 +178,7 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param materialized  an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}.
+     *                      Note: the valueSerde will be automatically set to {@link Serdes#Long()}
if there is no valueSerde provided
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index d8044ac..3c3ef7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -90,7 +91,8 @@ public interface SessionWindowedKStream<K, V> {
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param materialized an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}
+     * @param materialized  an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}.
+     *                      Note: the valueSerde will be automatically set to {@link Serdes#Long()}
if there is no valueSerde provided
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys
and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within
a window
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 433f4e7..8ef0bd7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -99,7 +100,8 @@ public interface TimeWindowedKStream<K, V> {
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
+     ** @param materialized  an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}.
+     *                      Note: the valueSerde will be automatically set to {@link Serdes#Long()}
if there is no valueSerde provided
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 4943314..64dfd19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -265,6 +265,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K>
implements KGroupedStre
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes,
byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        if (materializedInternal.valueSerde() == null) {
+            materialized.withValueSerde(Serdes.Long());
+        }
         return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator,
materialized);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 0603853..6644c92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -82,6 +82,11 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K>
implemen
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes,
byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        if (materializedInternal.valueSerde() == null) {
+            materialized.withValueSerde(Serdes.Long());
+        }
         return aggregate(aggregateBuilder.countInitializer,
                          aggregateBuilder.countAggregator,
                          countMerger,

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index cc6ca05..daba4c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -72,6 +72,11 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends
AbstractStr
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes,
byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        if (materializedInternal.valueSerde() == null) {
+            materialized.withValueSerde(Serdes.Long());
+        }
         return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator,
materialized);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index c8e011e..3af35d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -509,8 +509,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldCountAndMaterializeResults() {
         groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
-                                    .withKeySerde(Serdes.String())
-                                    .withValueSerde(Serdes.Long()));
+                                    .withKeySerde(Serdes.String()));
 
         processData();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/082def05/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 042f0f1..e75ef5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -130,8 +130,7 @@ public class SessionWindowedKStreamImplTest {
     @Test
     public void shouldMaterializeCount() {
         stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store")
-                             .withKeySerde(Serdes.String())
-                             .withValueSerde(Serdes.Long()));
+                             .withKeySerde(Serdes.String()));
 
         processData();
         final SessionStore<String, Long> store = (SessionStore<String, Long>)
driver.allStateStores().get("count-store");


Mime
View raw message