kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: KIP-182 follow up; add deprecation annotations to test classes
Date Tue, 10 Oct 2017 22:13:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 85f6f8cbb -> 6c1c5efe7


MINOR: KIP-182 follow up; add deprecation annotations to test classes

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4051 from mjsax/minor-kip-182-follow-up

(cherry picked from commit a5a9a901ef78373cda4b743323ef9f6e6da0dbf6)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6c1c5efe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6c1c5efe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6c1c5efe

Branch: refs/heads/1.0
Commit: 6c1c5efe7f83afc733608122c270642ccdc97399
Parents: 85f6f8c
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Oct 10 15:13:19 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 10 15:13:30 2017 -0700

----------------------------------------------------------------------
 .../KStreamAggregationIntegrationTest.java      |  6 ++-
 .../internals/KGroupedStreamImplTest.java       | 49 +++++++++++++++++---
 .../kafka/streams/tests/EosTestClient.java      | 27 ++++++-----
 3 files changed, 63 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6c1c5efe/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4169eb2..3500dd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -151,7 +151,7 @@ public class KStreamAggregationIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduce() throws Exception {
         produceMessages(mockTime.milliseconds());
@@ -259,6 +259,7 @@ public class KStreamAggregationIntegrationTest {
         ));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregate() throws Exception {
         produceMessages(mockTime.milliseconds());
@@ -394,6 +395,7 @@ public class KStreamAggregationIntegrationTest {
         )));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCount() throws Exception {
         produceMessages(mockTime.milliseconds());
@@ -404,6 +406,7 @@ public class KStreamAggregationIntegrationTest {
         shouldCountHelper();
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountWithInternalStore() throws Exception {
         produceMessages(mockTime.milliseconds());
@@ -459,6 +462,7 @@ public class KStreamAggregationIntegrationTest {
 
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6c1c5efe/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 3af35d9..c8b7c18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -75,16 +75,19 @@ public class KGroupedStreamImplTest {
         groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullReducerOnReduce() {
         groupedStream.reduce(null, "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAllowNullStoreNameOnReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameOnReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
@@ -109,66 +112,79 @@ public class KGroupedStreamImplTest {
         groupedStream.count(TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullReducerWithWindowedReduce() {
         groupedStream.reduce(null, TimeWindows.of(10), "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullWindowsWithWindowedReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (Windows) null, "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAllowNullStoreNameWithWindowedReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnAggregate() {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullAdderOnAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAllowNullStoreNameOnAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
Serdes.String(), null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameOnAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
Serdes.String(), INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnWindowedAggregate() {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10),
Serdes.String(), "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullAdderOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, TimeWindows.of(10), Serdes.String(),
"store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullWindowsOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
null, Serdes.String(), "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAllowNullStoreNameOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10), Serdes.String(), null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
@@ -200,10 +216,11 @@ public class KGroupedStreamImplTest {
         assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70,
100))));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregateSessionWindows() {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
-        KTable table = groupedStream.aggregate(new Initializer<Integer>() {
+        KTable<Windowed<String>, Integer> table = groupedStream.aggregate(new
Initializer<Integer>() {
             @Override
             public Integer apply() {
                 return 0;
@@ -230,10 +247,11 @@ public class KGroupedStreamImplTest {
         assertEquals(table.queryableStoreName(), "session-store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregateSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
-        KTable table = groupedStream.aggregate(new Initializer<Integer>() {
+        KTable<Windowed<String>, Integer> table = groupedStream.aggregate(new
Initializer<Integer>() {
             @Override
             public Integer apply() {
                 return 0;
@@ -279,10 +297,11 @@ public class KGroupedStreamImplTest {
         assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70,
100))));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountSessionWindows() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
-        KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
+        KTable<Windowed<String>, Long> table = groupedStream.count(SessionWindows.with(30),
"session-store");
         table.toStream().foreach(new ForeachAction<Windowed<String>, Long>()
{
             @Override
             public void apply(final Windowed<String> key, final Long value) {
@@ -293,10 +312,11 @@ public class KGroupedStreamImplTest {
         assertEquals(table.queryableStoreName(), "session-store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
-        KTable table = groupedStream.count(SessionWindows.with(30));
+        KTable<Windowed<String>, Long> table = groupedStream.count(SessionWindows.with(30));
         table.toStream().foreach(new ForeachAction<Windowed<String>, Long>()
{
             @Override
             public void apply(final Windowed<String> key, final Long value) {
@@ -327,10 +347,11 @@ public class KGroupedStreamImplTest {
         assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70,
100))));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSessionWindows() {
         final Map<Windowed<String>, String> results = new HashMap<>();
-        KTable table = groupedStream.reduce(
+        KTable<Windowed<String>, String> table = groupedStream.reduce(
                 new Reducer<String>() {
                     @Override
                     public String apply(final String value1, final String value2) {
@@ -348,10 +369,11 @@ public class KGroupedStreamImplTest {
         assertEquals(table.queryableStoreName(), "session-store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, String> results = new HashMap<>();
-        KTable table = groupedStream.reduce(
+        KTable<Windowed<String>, String> table = groupedStream.reduce(
                 new Reducer<String>() {
                     @Override
                     public String apply(final String value1, final String value2) {
@@ -368,22 +390,25 @@ public class KGroupedStreamImplTest {
         assertNull(table.queryableStoreName());
     }
 
-
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
         groupedStream.reduce(null, SessionWindows.with(10), "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows) null, "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAcceptNullStoreNameWhenReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String)
null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), INVALID_STORE_NAME);
@@ -395,6 +420,7 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>)
null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String,
String>() {
@@ -405,6 +431,7 @@ public class KGroupedStreamImplTest {
         }, SessionWindows.with(10), Serdes.String(), "storeName");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, new Merger<String,
String>() {
@@ -415,6 +442,7 @@ public class KGroupedStreamImplTest {
         }, SessionWindows.with(10), Serdes.String(), "storeName");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT,
@@ -425,6 +453,7 @@ public class KGroupedStreamImplTest {
                 "storeName");
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
new Merger<String, String>() {
@@ -435,6 +464,7 @@ public class KGroupedStreamImplTest {
         }, null, Serdes.String(), "storeName");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
new Merger<String, String>() {
@@ -445,6 +475,7 @@ public class KGroupedStreamImplTest {
         }, SessionWindows.with(10), Serdes.String(), (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
new Merger<String, String>() {
@@ -466,16 +497,19 @@ public class KGroupedStreamImplTest {
         }, SessionWindows.with(10), Serdes.String(), (StateStoreSupplier<SessionStore>)
null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() {
         groupedStream.count((SessionWindows) null, "store");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAcceptNullStoreNameWhenCountingSessionWindows() {
         groupedStream.count(SessionWindows.with(90), (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = InvalidTopicException.class)
     public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() {
         groupedStream.count(SessionWindows.with(90), INVALID_STORE_NAME);
@@ -612,6 +646,7 @@ public class KGroupedStreamImplTest {
         )));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountWindowed() {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/6c1c5efe/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 5e85bd2..1c11be4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -24,6 +25,9 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.io.File;
 import java.util.Properties;
@@ -147,9 +151,9 @@ public class EosTestClient extends SmokeTestUtil {
                         return (value < aggregate) ? value : aggregate;
                     }
                 },
-                intSerde,
-                "min")
-            .to(stringSerde, intSerde, "min");
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>with(null,
intSerde))
+            .toStream()
+            .to("min", Produced.with(stringSerde, intSerde));
 
         // sum
         groupedData.aggregate(
@@ -167,9 +171,9 @@ public class EosTestClient extends SmokeTestUtil {
                     return (long) value + aggregate;
                 }
             },
-            longSerde,
-            "sum")
-            .to(stringSerde, longSerde, "sum");
+            Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>with(null,
longSerde))
+            .toStream()
+            .to("sum", Produced.with(stringSerde, longSerde));
 
         if (withRepartitioning) {
             final KStream<String, Integer> repartitionedData = data.through("repartition");
@@ -194,13 +198,14 @@ public class EosTestClient extends SmokeTestUtil {
                             return (value > aggregate) ? value : aggregate;
                         }
                     },
-                    intSerde,
-                    "max")
-                .to(stringSerde, intSerde, "max");
+                    Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>with(null,
intSerde))
+                .toStream()
+                .to("max", Produced.with(stringSerde, intSerde));
 
             // count
-            groupedDataAfterRepartitioning.count("cnt")
-                .to(stringSerde, longSerde, "cnt");
+            groupedDataAfterRepartitioning.count()
+                .toStream()
+                .to("cnt", Produced.with(stringSerde, longSerde));
         }
 
         return new KafkaStreams(builder.build(), props);


Mime
View raw message