kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: HOTFIX: use ConsumedInternal in StreamsBuilder
Date Fri, 18 May 2018 00:20:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba237c5  HOTFIX: use ConsumedInternal in StreamsBuilder
ba237c5 is described below

commit ba237c5d21abb8b63c5edf53517654a214157582
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu May 17 17:20:12 2018 -0700

    HOTFIX: use ConsumedInternal in StreamsBuilder
---
 .../org/apache/kafka/streams/StreamsBuilder.java     | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a05a9b3..ead8a76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -222,9 +222,10 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
+        materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(consumed),
+                                            consumedInternal,
                                             new MaterializedInternal<>(materialized,
internalStreamsBuilder, topic + "-"));
     }
 
@@ -271,10 +272,11 @@ public class StreamsBuilder {
                                                   final Consumed<K, V> consumed) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(consumed),
+                                            consumedInternal,
                                             new MaterializedInternal<>(
-                                                    Materialized.<K, V, KeyValueStore<Bytes,
byte[]>>with(consumed.keySerde, consumed.valueSerde),
+                                                    Materialized.<K, V, KeyValueStore<Bytes,
byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
                                                     internalStreamsBuilder,
                                                     topic + "-"));
     }
@@ -328,14 +330,15 @@ public class StreamsBuilder {
                                                               final Consumed<K, V>
consumed) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized
=
                 new MaterializedInternal<>(
-                        Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde,
consumed.valueSerde),
+                        Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(),
consumedInternal.valueSerde()),
                         internalStreamsBuilder,
                         topic + "-");
 
 
-        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed),
materialized);
+        return internalStreamsBuilder.globalTable(topic, consumedInternal, materialized);
     }
 
     /**
@@ -396,10 +399,11 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
+        final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         // always use the serdes from consumed
-        materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
         return internalStreamsBuilder.globalTable(topic,
-                                                  new ConsumedInternal<>(consumed),
+                                                  consumedInternal,
                                                   new MaterializedInternal<>(materialized,
internalStreamsBuilder, topic + "-"));
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message