kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6567: Remove KStreamWindowReducer (#5922)
Date Tue, 20 Nov 2018 05:28:21 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8dc4d0e  KAFKA-6567: Remove KStreamWindowReducer (#5922)
8dc4d0e is described below

commit 8dc4d0e7872bad4cf7e5ebaf28229acc5667ae2d
Author: Samuel Hawker <sam.b.hawker@gmail.com>
AuthorDate: Tue Nov 20 05:28:13 2018 +0000

    KAFKA-6567: Remove KStreamWindowReducer (#5922)
    
    This pull request removes the final reference to KStreamWindowReducer and replaces it
with KStreamWindowAggregate
    
    Signed-off-by: Samuel Hawker sam.b.hawker@gmail.com
    
    contribution is my original work and that I license the work to the project under the
project's open source license.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kstream/internals/KStreamWindowReduce.java     |  34 ------
 .../kstream/internals/TimeWindowedKStreamImpl.java |   6 +-
 .../kstream/internals/KStreamWindowReduceTest.java | 132 ---------------------
 3 files changed, 5 insertions(+), 167 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
deleted file mode 100644
index babe3eb..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windows;
-
-class KStreamWindowReduce<K, V, W extends Window> extends KStreamWindowAggregate<K,
V, V, W> {
-    KStreamWindowReduce(final Windows<W> windows,
-                        final String storeName,
-                        final Reducer<V> reducer) {
-        super(
-            windows,
-            storeName,
-            () -> null,
-            (key, newValue, oldValue) -> oldValue == null ? newValue : reducer.apply(oldValue,
newValue)
-        );
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index ecfe155..dfead3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -147,7 +147,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends
AbstractStr
         return aggregateBuilder.build(
             REDUCE_NAME,
             materialize(materializedInternal),
-            new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
+            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(),
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
             materializedInternal.isQueryable(),
             materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.size()) : null,
             materializedInternal.valueSerde());
@@ -216,4 +216,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends
AbstractStr
         }
         return builder;
     }
+
+    private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer)
{
+        return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate,
value);
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
deleted file mode 100644
index 634cb2f..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.streams.test.OutputVerifier;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static java.time.Duration.ofMillis;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-
-public class KStreamWindowReduceTest {
-
-    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
-    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new StringSerializer());
-
-    @Test
-    public void shouldLogAndMeterOnNullKey() {
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        builder
-            .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
-            .reduce((value1, value2) -> value1 + "+" + value2);
-
-
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props))
{
-            final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-            driver.pipeInput(recordFactory.create("TOPIC", null, "asdf"));
-            LogCaptureAppender.unregister(appender);
-
-            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total",
"stream-metrics").metricValue());
-            assertThat(appender.getMessages(), hasItem("Skipping record due to null key.
value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
-        }
-    }
-
-    @Deprecated // testing deprecated functionality (behavior of until)
-    @Test
-    public void shouldLogAndMeterOnExpiredEvent() {
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        builder
-            .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(5L)).until(100))
-            .reduce((value1, value2) -> value1 + "+" + value2)
-            .toStream()
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("output");
-
-
-        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props))
{
-            LogCaptureAppender.setClassLoggerToDebug(KStreamWindowReduce.class);
-            final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "100", 100L));
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "0", 0L));
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "1", 1L));
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "2", 2L));
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "3", 3L));
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "4", 4L));
-            driver.pipeInput(recordFactory.create("TOPIC", "k", "5", 5L));
-            LogCaptureAppender.unregister(appender);
-
-            final Metric dropMetric = driver.metrics().get(new MetricName(
-                "late-record-drop-total",
-                "stream-processor-node-metrics",
-                "The total number of occurrence of late-record-drop operations.",
-                mkMap(
-                    mkEntry("client-id", "topology-test-driver-virtual-thread"),
-                    mkEntry("task-id", "0_0"),
-                    mkEntry("processor-node-id", "KSTREAM-REDUCE-0000000002")
-                )
-            ));
-
-            assertThat(dropMetric.metricValue(), equalTo(5.0));
-            assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0]
offset=[1] timestamp=[0] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0]
offset=[2] timestamp=[1] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0]
offset=[3] timestamp=[2] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0]
offset=[4] timestamp=[3] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0]
offset=[5] timestamp=[4] window=[0,5) expiration=[5]"
-            ));
-
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/105]", "100",
100);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/10]", "5", 5);
-            assertThat(driver.readOutput("output"), nullValue());
-        }
-    }
-
-    private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver)
{
-        return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
-    }
-}


Mime
View raw message