kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [kafka] Diff for: [GitHub] guozhangwang closed pull request #6114: MINOR: Put state args in correct order named repartition test
Date Mon, 14 Jan 2019 00:57:11 GMT
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index f836baa704f..911716f348e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
@@ -31,6 +32,7 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -76,7 +78,7 @@ public static void main(final String[] args) throws Exception {
         }
 
         maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
-            .aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer()))
+            .aggregate(initializer, aggregator, Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()))
             .toStream()
             .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s",
k, v)))
             .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
@@ -95,7 +97,7 @@ public static void main(final String[] args) throws Exception {
         final KafkaStreams streams = new KafkaStreams(topology, config);
 
 
-        streams.setStateListener((oldState, newState) -> {
+        streams.setStateListener((newState, oldState) -> {
             if (oldState == State.REBALANCING && newState == State.RUNNING) {
                 if (addOperators) {
                     System.out.println("UPDATED Topology");


With regards,
Apache Git Services

Mime
View raw message