From commits-return-10731-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Nov 15 01:29:33 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F21BF186DA for ; Thu, 15 Nov 2018 01:29:32 +0000 (UTC) Received: (qmail 76979 invoked by uid 500); 15 Nov 2018 01:29:32 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 76939 invoked by uid 500); 15 Nov 2018 01:29:32 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 76930 invoked by uid 99); 15 Nov 2018 01:29:32 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Nov 2018 01:29:32 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7D244857C3; Thu, 15 Nov 2018 01:29:31 +0000 (UTC) Date: Thu, 15 Nov 2018 01:29:30 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Remove deprecated callers (#5911) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154224536990.12176.7842071500272406989@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 545d40d83c73445d0417b1bf3b6e979501888ae7 X-Git-Newrev: 14d3ead19d250f2f3117af473ff6244c663ef8ca X-Git-Rev: 14d3ead19d250f2f3117af473ff6244c663ef8ca X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 14d3ead MINOR: Remove deprecated callers (#5911) 14d3ead is described below commit 14d3ead19d250f2f3117af473ff6244c663ef8ca Author: Guozhang Wang AuthorDate: Wed Nov 14 17:29:19 2018 -0800 MINOR: Remove deprecated callers (#5911) Callers of 1) Windows#until, 2) Windows#of, 3) Serialized are replaced when possible with the new APIs. Reviewers: Matthias J. Sax , Bill Bejeck --- .../examples/pageview/PageViewTypedDemo.java | 4 +-- .../examples/pageview/PageViewUntypedDemo.java | 4 +-- .../apache/kafka/streams/kstream/JoinWindows.java | 4 +-- .../org/apache/kafka/streams/kstream/KStream.java | 2 +- .../org/apache/kafka/streams/kstream/KTable.java | 2 +- .../kafka/streams/kstream/KeyValueMapper.java | 4 +-- .../kafka/streams/kstream/SessionWindows.java | 2 +- .../KStreamAggregationDedupIntegrationTest.java | 10 ++---- .../KStreamAggregationIntegrationTest.java | 16 ++++------ .../kafka/streams/kstream/JoinWindowsTest.java | 34 ++++++++------------ .../kstream/RepartitionTopicNamingTest.java | 29 +++++++++-------- .../kafka/streams/kstream/SessionWindowsTest.java | 17 ++++------ .../kafka/streams/kstream/TimeWindowsTest.java | 25 ++++++--------- .../kstream/internals/KGroupedStreamImplTest.java | 4 +-- .../kstream/internals/KGroupedTableImplTest.java | 13 +++----- .../streams/kstream/internals/KStreamImplTest.java | 37 +++++++++++----------- .../internals/KStreamWindowAggregateTest.java | 2 +- .../kstream/internals/KStreamWindowReduceTest.java | 6 ++-- .../kstream/internals/KTableAggregateTest.java | 6 ++-- .../streams/kstream/internals/KTableImplTest.java | 6 ++-- .../internals/KTableKTableLeftJoinTest.java | 25 +++------------ .../internals/KTableTransformValuesTest.java | 7 ++-- .../internals/SessionWindowedKStreamImplTest.java | 4 +-- .../kstream/internals/SuppressScenarioTest.java | 7 ++-- .../internals/TimeWindowedKStreamImplTest.java | 4 +-- .../apache/kafka/streams/perf/YahooBenchmark.java | 4 +-- .../streams/tests/BrokerCompatibilityTest.java | 4 +-- .../kafka/streams/tests/SmokeTestClient.java | 7 ++-- 28 files changed, 125 insertions(+), 164 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 18b4912..f09ac80 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.TimeWindows; @@ -170,7 +171,6 @@ public class PageViewTypedDemo { public String region; } - @SuppressWarnings("deprecation") public static void main(final String[] args) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed"); @@ -206,7 +206,7 @@ public class PageViewTypedDemo { return viewByRegion; }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), new JSONSerde<>())) + .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>())) .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1))) .count() .toStream() diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index d492238..2a9972b 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; @@ -54,7 +55,6 @@ import java.util.Properties; */ public class PageViewUntypedDemo { - @SuppressWarnings("deprecation") public static void main(final String[] args) throws Exception { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped"); @@ -87,7 +87,7 @@ public class PageViewUntypedDemo { }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), jsonSerde)) + .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1))) .count() .toStream() diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 62eade4..2087009 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -246,7 +246,7 @@ public final class JoinWindows extends Windows { * @param durationMs the window retention time in milliseconds * @return itself * @throws IllegalArgumentException if {@code durationMs} is smaller than the window size - * @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead. + * @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} instead. */ @SuppressWarnings("deprecation") @Override @@ -264,7 +264,7 @@ public final class JoinWindows extends Windows { * For {@link TimeWindows} the maintain duration is at least as small as the window size. * * @return the window maintain duration - * @deprecated since 2.1. Use {@link JoinWindows#gracePeriodMs()} instead. + * @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}. */ @SuppressWarnings("deprecation") @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 77987a9..6055199 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -874,7 +874,7 @@ public interface KStream { * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. *

* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}. - * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)} instead. + * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} instead. * * @param selector a {@link KeyValueMapper} that computes a new key for grouping * @param the key type of the result {@link KGroupedStream} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index b49f3e4..5ed0270 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -573,7 +573,7 @@ public interface KTable { * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned * on the new key. *

- * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)} + * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} * instead. * * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java index 2a56a05..1112fbb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -39,9 +39,9 @@ import org.apache.kafka.streams.KeyValue; * @see KStream#flatMap(KeyValueMapper) * @see KStream#selectKey(KeyValueMapper) * @see KStream#groupBy(KeyValueMapper) - * @see KStream#groupBy(KeyValueMapper, Serialized) + * @see KStream#groupBy(KeyValueMapper, Grouped) * @see KTable#groupBy(KeyValueMapper) - * @see KTable#groupBy(KeyValueMapper, Serialized) + * @see KTable#groupBy(KeyValueMapper, Grouped) * @see KTable#toStream(KeyValueMapper) */ public interface KeyValueMapper { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index 02c7cbf..526c9d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -89,7 +89,7 @@ public final class SessionWindows { * @return a new window specification with default maintain duration of 1 day * * @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative - * @deprecated User {@link #with(Duration)} instead. + * @deprecated Use {@link #with(Duration)} instead. */ @Deprecated public static SessionWindows with(final long inactivityGapMs) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 1db23a5..7493b06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -78,7 +79,6 @@ public class KStreamAggregationDedupIntegrationTest { private Reducer reducer; private KStream stream; - @SuppressWarnings("deprecation") @Before public void before() throws InterruptedException { testNo++; @@ -96,10 +96,7 @@ public class KStreamAggregationDedupIntegrationTest { final KeyValueMapper mapper = MockMapper.selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); - groupedStream = stream - .groupBy( - mapper, - org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())); + groupedStream = stream.groupBy(mapper, Grouped.with(Serdes.String(), Serdes.String())); reducer = (value1, value2) -> value1 + ":" + value2; } @@ -173,14 +170,13 @@ public class KStreamAggregationDedupIntegrationTest { ); } - @SuppressWarnings("deprecation") @Test public void shouldGroupByKey() throws Exception { final long timestamp = mockTime.milliseconds(); produceMessages(timestamp); produceMessages(timestamp); - stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.Integer(), Serdes.String())) + stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(500L))) .count(Materialized.as("count-windows")) .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 718483b..04cc0e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; @@ -92,7 +93,7 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@SuppressWarnings({"unchecked", "deprecation"}) +@SuppressWarnings("unchecked") @Category({IntegrationTest.class}) public class KStreamAggregationIntegrationTest { private static final int NUM_BROKERS = 1; @@ -135,10 +136,7 @@ public class KStreamAggregationIntegrationTest { final KeyValueMapper mapper = MockMapper.selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); - groupedStream = stream - .groupBy( - mapper, - org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())); + groupedStream = stream.groupBy(mapper, Grouped.with(Serdes.String(), Serdes.String())); reducer = (value1, value2) -> value1 + ":" + value2; initializer = () -> 0; @@ -428,7 +426,7 @@ public class KStreamAggregationIntegrationTest { produceMessages(timestamp); produceMessages(timestamp); - stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.Integer(), Serdes.String())) + stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(500L))) .count() .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); @@ -521,7 +519,7 @@ public class KStreamAggregationIntegrationTest { final CountDownLatch latch = new CountDownLatch(11); builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.with(ofMillis(sessionGap))) .count() .toStream() @@ -619,7 +617,7 @@ public class KStreamAggregationIntegrationTest { final CountDownLatch latch = new CountDownLatch(11); final String userSessionsStore = "UserSessionsStore"; builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.with(ofMillis(sessionGap))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore)) .toStream() @@ -706,7 +704,7 @@ public class KStreamAggregationIntegrationTest { final CountDownLatch latch = new CountDownLatch(5); builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime))) .count() .toStream() diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index 7127c9f..dda2da4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -72,12 +72,11 @@ public class JoinWindowsTest { } } - @Deprecated @Test - public void untilShouldSetMaintainDuration() { + public void untilShouldSetGraceDuration() { final JoinWindows windowSpec = JoinWindows.of(ofMillis(ANY_SIZE)); final long windowSize = windowSpec.size(); - assertEquals(windowSize, windowSpec.until(windowSize).maintainMs()); + assertEquals(windowSize, windowSpec.grace(ofMillis(windowSize)).gracePeriodMs()); } @Deprecated @@ -115,16 +114,16 @@ public class JoinWindowsTest { verifyEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(2)), JoinWindows.of(ofMillis(3)).grace(ofMillis(2))); - verifyEquality(JoinWindows.of(ofMillis(3)).until(60), JoinWindows.of(ofMillis(3)).until(60)); + verifyEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(60)), JoinWindows.of(ofMillis(3)).grace(ofMillis(60))); verifyEquality( - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60), - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60) + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)), + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)) ); // JoinWindows is a little weird in that before and after set the same fields as of. verifyEquality( - JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60), - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60) + JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)), + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)) ); } @@ -138,27 +137,22 @@ public class JoinWindowsTest { verifyInEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(9)), JoinWindows.of(ofMillis(3)).grace(ofMillis(2))); - verifyInEquality(JoinWindows.of(ofMillis(3)).until(90), JoinWindows.of(ofMillis(3)).until(60)); - + verifyInEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(90)), JoinWindows.of(ofMillis(3)).grace(ofMillis(60))); - verifyInEquality( - JoinWindows.of(ofMillis(3)).before(ofMillis(9)).after(ofMillis(2)).grace(ofMillis(3)).until(60), - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60) - ); verifyInEquality( - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(9)).grace(ofMillis(3)).until(60), - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60) + JoinWindows.of(ofMillis(3)).before(ofMillis(9)).after(ofMillis(2)).grace(ofMillis(3)), + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)) ); verifyInEquality( - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)).until(60), - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60) + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(9)).grace(ofMillis(3)), + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)) ); verifyInEquality( - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(90), - JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60) + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)), + JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)) ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java index 3f43691..929bc3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.AbstractProcessor; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -102,8 +103,8 @@ public class RepartitionTopicNamingTest { try { final StreamsBuilder builder = new StreamsBuilder(); final KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); - kGroupedStream.windowedBy(TimeWindows.of(10)).count(); - kGroupedStream.windowedBy(TimeWindows.of(30)).count(); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); builder.build(); fail("Should not build re-using repartition topic name"); } catch (final TopologyException te) { @@ -121,8 +122,8 @@ public class RepartitionTopicNamingTest { final KStream stream2 = builder.stream("topic2").selectKey((k, v) -> k); final KStream stream3 = builder.stream("topic3").selectKey((k, v) -> k); - final KStream joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition")); - joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition")); + final KStream joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition")); + joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition")); builder.build(); fail("Should not build re-using repartition topic name"); } catch (final TopologyException te) { @@ -137,8 +138,8 @@ public class RepartitionTopicNamingTest { final Properties properties = new Properties(); properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); final KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); - kGroupedStream.windowedBy(TimeWindows.of(10)).count(); - kGroupedStream.windowedBy(TimeWindows.of(30)).count(); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); builder.build(properties); } @@ -272,9 +273,9 @@ public class RepartitionTopicNamingTest { if (isGroupByKey) { if (otherOperations) { - selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count(); + selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); } else { - selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count(); + selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); } } else { if (otherOperations) { @@ -297,15 +298,15 @@ public class RepartitionTopicNamingTest { final String groupedSessionWindowRepartitionTopicName = "session-window-grouping"; if (isGroupByKey) { if (otherOperations) { - selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count(); + selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count(); } else { - selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count(); + selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count(); } } else { if (otherOperations) { - selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count(); + selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count(); } else { - selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count(); + selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count(); } } @@ -356,7 +357,7 @@ public class RepartitionTopicNamingTest { final String joinRepartitionTopicName = "my-join"; updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2, - JoinWindows.of(1000), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName)); + JoinWindows.of(Duration.ofMillis(1000L)), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName)); return builder.build().describe().toString(); } @@ -403,7 +404,7 @@ public class RepartitionTopicNamingTest { mappedStream.filter((k, v) -> k.equals("A")) .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), - JoinWindows.of(5000), + JoinWindows.of(Duration.ofMillis(5000L)), Joined.with(Serdes.String(), Serdes.String(), Serdes.Long(), fourthRepartitionTopicName)) .to(JOINED_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java index 369bbad..eac978c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java @@ -33,11 +33,10 @@ public class SessionWindowsTest { assertEquals(anyGap, SessionWindows.with(ofMillis(anyGap)).inactivityGap()); } - @Deprecated @Test - public void shouldSetWindowRetentionTime() { + public void shouldSetWindowGraceTime() { final long anyRetentionTime = 42L; - assertEquals(anyRetentionTime, SessionWindows.with(ofMillis(1)).until(anyRetentionTime).maintainMs()); + assertEquals(anyRetentionTime, SessionWindows.with(ofMillis(1)).grace(ofMillis(anyRetentionTime)).gracePeriodMs()); } @@ -88,9 +87,9 @@ public class SessionWindowsTest { verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6))); - verifyEquality(SessionWindows.with(ofMillis(1)).until(7), SessionWindows.with(ofMillis(1)).until(7)); + verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7))); - verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7)); + verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).grace(ofMillis(7))); } @Test @@ -99,12 +98,10 @@ public class SessionWindowsTest { verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6))); - verifyInEquality(SessionWindows.with(ofMillis(1)).until(9), SessionWindows.with(ofMillis(1)).until(7)); - - verifyInEquality(SessionWindows.with(ofMillis(2)).grace(ofMillis(6)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7)); + verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7))); - verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(0)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7)); + verifyInEquality(SessionWindows.with(ofMillis(2)).grace(ofMillis(6)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6))); - verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(70), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7)); + verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(0)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6))); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 2bdb3a0..dcb691b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -160,11 +160,11 @@ public class TimeWindowsTest { verifyEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(1)), TimeWindows.of(ofMillis(3)).grace(ofMillis(1))); - verifyEquality(TimeWindows.of(ofMillis(3)).until(4), TimeWindows.of(ofMillis(3)).until(4)); + verifyEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(4)), TimeWindows.of(ofMillis(3)).grace(ofMillis(4))); verifyEquality( - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).until(4), - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).until(4) + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)), + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)) ); } @@ -176,27 +176,22 @@ public class TimeWindowsTest { verifyInEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(2)), TimeWindows.of(ofMillis(3)).grace(ofMillis(1))); - verifyInEquality(TimeWindows.of(ofMillis(3)).until(9), TimeWindows.of(ofMillis(3)).until(4)); + verifyInEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(9)), TimeWindows.of(ofMillis(3)).grace(ofMillis(4))); verifyInEquality( - TimeWindows.of(ofMillis(4)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4), - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4) + TimeWindows.of(ofMillis(4)).advanceBy(ofMillis(2)).grace(ofMillis(2)), + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)) ); verifyInEquality( - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(2)).until(4), - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4) + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(2)), + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)) ); assertNotEquals( - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(1)).until(4), - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4) - ); - - assertNotEquals( - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(9), - TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4) + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(1)), + TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)) ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 26b7e24..f2fc4f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; @@ -78,11 +79,10 @@ public class KGroupedStreamImplTest { private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); - @SuppressWarnings("deprecation") @Before public void before() { final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - groupedStream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())); + groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index c06cce4..99f1b81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -204,13 +205,11 @@ public class KGroupedTableImplTest { } } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") @Test public void shouldCountAndMaterializeResults() { final KTable table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String())); - table.groupBy(MockMapper.selectValueKeyValueMapper(), - org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), - Serdes.String())) + table.groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String())) .count(Materialized.>as("count") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); @@ -223,13 +222,11 @@ public class KGroupedTableImplTest { } } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") @Test public void shouldAggregateAndMaterializeResults() { final KTable table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String())); - table.groupBy(MockMapper.selectValueKeyValueMapper(), - org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), - Serdes.String())) + table.groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String())) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 1f3492c..772836f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; @@ -54,6 +55,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -188,7 +190,6 @@ public class KStreamImplTest { TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size()); } - @SuppressWarnings("deprecation") @Test public void shouldPreserveSerdesForOperators() { final StreamsBuilder builder = new StreamsBuilder(); @@ -264,28 +265,28 @@ public class KStreamImplTest { assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde()); assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde()); - assertEquals(((AbstractStream) stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde); + assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), null); assertEquals(((AbstractStream) stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde()); - assertEquals(((AbstractStream) stream1.groupBy(selector, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) stream1.groupBy(selector, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde); + assertEquals(((AbstractStream) stream1.groupBy(selector, Grouped.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.groupBy(selector, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); - assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).keySerde(), null); - assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null); - assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); - assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); + assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null); + assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null); + assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); - assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null); - assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null); - assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); - assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); + assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null); + assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null); + assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); - assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null); - assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null); - assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); - assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); + assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null); + assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null); + assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); assertEquals(((AbstractStream) stream1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); assertEquals(((AbstractStream) stream1.join(table1, joiner)).valueSerde(), null); @@ -384,7 +385,7 @@ public class KStreamImplTest { }); stream.join(kStream, valueJoiner, - JoinWindows.of(ofMillis(windowSize)).until(3 * windowSize), + JoinWindows.of(ofMillis(windowSize)).grace(ofMillis(3 * windowSize)), Joined.with(Serdes.String(), Serdes.String(), Serdes.String())) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index b82c8fe..068062b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -260,7 +260,7 @@ public class KStreamWindowAggregateTest { final String topic = "topic"; final KStream stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); - stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100)) .aggregate( () -> "", diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java index 790c563..634cb2f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.test.ConsumerRecordFactory; @@ -51,14 +52,13 @@ public class KStreamWindowReduceTest { private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - @SuppressWarnings("deprecation") @Test public void shouldLogAndMeterOnNullKey() { final StreamsBuilder builder = new StreamsBuilder(); builder .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(500L))) .reduce((value1, value2) -> value1 + "+" + value2); @@ -80,7 +80,7 @@ public class KStreamWindowReduceTest { final StreamsBuilder builder = new StreamsBuilder(); builder .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(5L)).until(100)) .reduce((value1, value2) -> value1 + "+" + value2) .toStream() diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 01b2609..0452c06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -55,8 +56,7 @@ public class KTableAggregateTest { private final Serde stringSerde = Serdes.String(); private final Consumed consumed = Consumed.with(stringSerde, stringSerde); - private final org.apache.kafka.streams.kstream.Serialized stringSerialzied = - org.apache.kafka.streams.kstream.Serialized.with(stringSerde, stringSerde); + private final Grouped stringSerialzied = Grouped.with(stringSerde, stringSerde); private final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); private File stateDir = null; @@ -362,7 +362,7 @@ public class KTableAggregateTest { public KeyValue apply(final Long key, final String value) { return new KeyValue<>(value, key); } - }, org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Long())) + }, Grouped.with(Serdes.String(), Serdes.Long())) .reduce(new Reducer() { @Override public Long apply(final Long value1, final Long value2) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 5747fed..d0ed50b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriverWrapper; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; @@ -135,7 +136,6 @@ public class KTableImplTest { } @Test - @SuppressWarnings("deprecation") public void shouldPreserveSerdesForOperators() { final StreamsBuilder builder = new StreamsBuilder(); final KTable table1 = builder.table("topic-2", stringConsumed); @@ -184,8 +184,8 @@ public class KTableImplTest { assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null); assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null); - assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde); + assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 1cd360a..f5d74b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -18,19 +18,17 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; @@ -344,7 +342,6 @@ public class KTableKTableLeftJoinTest { * It is based on a fairly complicated join used by the developer that reported the bug. * Before the fix this would trigger an IllegalStateException. */ - @SuppressWarnings("deprecation") @Test public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() { final String agg = "agg"; @@ -360,16 +357,8 @@ public class KTableKTableLeftJoinTest { final Consumed consumed = Consumed.with(Serdes.Long(), Serdes.String()); final KTable aggTable = builder .table(agg, consumed) - .groupBy( - new KeyValueMapper>() { - @Override - public KeyValue apply(final Long key, final String value) { - return new KeyValue<>(key, value); - } - }, - org.apache.kafka.streams.kstream.Serialized.with(Serdes.Long(), Serdes.String()) - ) - .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.>as("agg-store")); + .groupBy(KeyValue::new, Grouped.with(Serdes.Long(), Serdes.String())) + .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.as("agg-store")); final KTable one = builder.table(tableOne, consumed); final KTable two = builder.table(tableTwo, consumed); @@ -378,12 +367,8 @@ public class KTableKTableLeftJoinTest { final KTable five = builder.table(tableFive, consumed); final KTable six = builder.table(tableSix, consumed); - final ValueMapper mapper = new ValueMapper() { - @Override - public String apply(final String value) { - return value.toUpperCase(Locale.ROOT); - } - }; + final ValueMapper mapper = value -> value.toUpperCase(Locale.ROOT); + final KTable seven = one.mapValues(mapper); final KTable eight = six.leftJoin(seven, MockValueJoiner.TOSTRING_JOINER); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java index 82ada52..350b0d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueMapper; @@ -359,7 +360,6 @@ public class KTableTransformValuesTest { assertThat(keyValueStore.get("C"), is("C->null!")); } - @SuppressWarnings("deprecation") @Test public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() { builder @@ -369,7 +369,7 @@ public class KTableTransformValuesTest { Materialized.>as(QUERYABLE_NAME) .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Integer())) - .groupBy(toForceSendingOfOldValues(), org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Integer())) + .groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer())) .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR) .mapValues(mapBackToStrings()) .toStream() @@ -387,13 +387,12 @@ public class KTableTransformValuesTest { assertThat(keyValueStore.get("A"), is(3)); } - @SuppressWarnings("deprecation") @Test public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() { builder .table(INPUT_TOPIC, CONSUMED) .transformValues(new StatelessTransformerSupplier()) - .groupBy(toForceSendingOfOldValues(), org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Integer())) + .groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer())) .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR) .mapValues(mapBackToStrings()) .toStream() diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 749d5d6..57d7b14 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; @@ -65,11 +66,10 @@ public class SessionWindowedKStreamImplTest { }; private SessionWindowedKStream stream; - @SuppressWarnings("deprecation") @Before public void before() { final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - this.stream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.with(ofMillis(500))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 57b860e..b0b87be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -359,7 +359,7 @@ public class SuppressScenarioTest { final KTable, Long> valueCounts = builder .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)) - .windowedBy(TimeWindows.of(2L).grace(ofMillis(1L))) + .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L))) .count(Materialized.>as("counts").withCachingDisabled()); valueCounts .suppress(untilWindowCloses(unbounded())) @@ -410,7 +410,7 @@ public class SuppressScenarioTest { final KTable, Long> valueCounts = builder .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)) - .windowedBy(TimeWindows.of(2L).grace(ofMillis(2L))) + .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(2L))) .count(Materialized.>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE)); valueCounts .suppress(untilWindowCloses(unbounded())) @@ -459,14 +459,13 @@ public class SuppressScenarioTest { } } - @SuppressWarnings("deprecation") @Test public void shouldSupportFinalResultsForSessionWindows() { final StreamsBuilder builder = new StreamsBuilder(); final KTable, Long> valueCounts = builder .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)) - .windowedBy(SessionWindows.with(5L).grace(ofMillis(5L))) + .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L))) .count(Materialized.>as("counts").withCachingDisabled()); valueCounts .suppress(untilWindowCloses(unbounded())) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index c67b9e6..ce0f25e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.TimeWindowedKStream; @@ -58,11 +59,10 @@ public class TimeWindowedKStreamImplTest { private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private TimeWindowedKStream windowedStream; - @SuppressWarnings("deprecation") @Before public void before() { final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - windowedStream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + windowedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(500L))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java index de1a621..ec7f2fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -251,7 +252,6 @@ public class YahooBenchmark { } } - @SuppressWarnings("deprecation") private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic, final CountDownLatch latch, final int numRecords) { final Map serdeProps = new HashMap<>(); @@ -333,7 +333,7 @@ public class YahooBenchmark { // calculate windowed counts keyedByCampaign - .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000))) .count(Materialized.>as("time-windows")); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 623c3e3..47f74d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.ValueMapper; import java.io.IOException; @@ -46,7 +47,6 @@ public class BrokerCompatibilityTest { private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic"; private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic"; - @SuppressWarnings("deprecation") public static void main(final String[] args) throws IOException { if (args.length < 2) { System.err.println("BrokerCompatibilityTest are expecting two parameters: propFile, eosEnabled; but only see " + args.length + " parameter"); @@ -83,7 +83,7 @@ public class BrokerCompatibilityTest { final StreamsBuilder builder = new StreamsBuilder(); - builder.stream(SOURCE_TOPIC).groupByKey(org.apache.kafka.streams.kstream.Serialized.with(stringSerde, stringSerde)) + builder.stream(SOURCE_TOPIC).groupByKey(Grouped.with(stringSerde, stringSerde)) .count() .toStream() .mapValues(new ValueMapper() { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index ef908f4..a396ad1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; @@ -110,7 +111,6 @@ public class SmokeTestClient extends SmokeTestUtil { return fullProps; } - @SuppressWarnings("deprecation") private static KafkaStreams createKafkaStreams(final Properties props) { final StreamsBuilder builder = new StreamsBuilder(); final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); @@ -125,8 +125,7 @@ public class SmokeTestClient extends SmokeTestUtil { data.process(SmokeTestUtil.printProcessorSupplier("data")); // min - final KGroupedStream groupedData = - data.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(stringSerde, intSerde)); + final KGroupedStream groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); groupedData .windowedBy(TimeWindows.of(Duration.ofDays(1))) @@ -239,7 +238,7 @@ public class SmokeTestClient extends SmokeTestUtil { // test repartition final Agg agg = new Agg(); - cntTable.groupBy(agg.selector(), org.apache.kafka.streams.kstream.Serialized.with(stringSerde, longSerde)) + cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)) .aggregate(agg.init(), agg.adder(), agg.remover(), Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")) .withKeySerde(Serdes.String())