kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Use Java 8 lambdas in KStreamImplTest (#6430)
Date Tue, 12 Mar 2019 20:35:42 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ecadc4  MINOR: Use Java 8 lambdas in KStreamImplTest (#6430)
9ecadc4 is described below

commit 9ecadc4df474d9cfbfda3256f01eba1423cf5902
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Tue Mar 12 16:35:25 2019 -0400

    MINOR: Use Java 8 lambdas in KStreamImplTest (#6430)
    
    Just a minor cleanup to use Java 8 lambdas vs anonymous classes in this test.
    
    I ran all tests in the streams test suite
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
---
 .../main/scala/kafka/tools/StreamsResetter.java    |  2 +-
 .../streams/kstream/internals/KStreamImplTest.java | 73 ++++------------------
 2 files changed, 14 insertions(+), 61 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 3666f67..71529f8 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -257,7 +257,7 @@ public class StreamsResetter {
             CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute
can be specified");
         }
 
-        final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions
= new scala.collection.immutable.HashSet<OptionSpec<?>>();
+        final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions
= new scala.collection.immutable.HashSet<>();
         allScenarioOptions.$plus(toOffsetOption);
         allScenarioOptions.$plus(toDatetimeOption);
         allScenarioOptions.$plus(byDurationOption);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 76a01cd..bd2ab5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -104,78 +104,31 @@ public class KStreamImplTest {
 
         final KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3",
"topic-4"), stringConsumed);
 
-        final KStream<String, String> stream1 =
-            source1.filter(new Predicate<String, String>() {
-                @Override
-                public boolean test(final String key, final String value) {
-                    return true;
-                }
-            }).filterNot(new Predicate<String, String>() {
-                @Override
-                public boolean test(final String key, final String value) {
-                    return false;
-                }
-            });
+        final KStream<String, String> stream1 = source1.filter((key, value) -> true)
+                                                       .filterNot((key, value) -> false);
 
-        final KStream<String, Integer> stream2 = stream1.mapValues(new ValueMapper<String,
Integer>() {
-            @Override
-            public Integer apply(final String value) {
-                return new Integer(value);
-            }
-        });
+        final KStream<String, Integer> stream2 = stream1.mapValues(Integer::new);
 
-        final KStream<String, Integer> stream3 = source2.flatMapValues(new ValueMapper<String,
Iterable<Integer>>() {
-            @Override
-            public Iterable<Integer> apply(final String value) {
-                return Collections.singletonList(new Integer(value));
-            }
-        });
+        final KStream<String, Integer> stream3 = source2.flatMapValues((ValueMapper<String,
Iterable<Integer>>)
+            value -> Collections.singletonList(new Integer(value)));
 
         final KStream<String, Integer>[] streams2 = stream2.branch(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                },
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return true;
-                    }
-                }
+            (key, value) -> (value % 2) == 0,
+            (key, value) -> true
         );
 
         final KStream<String, Integer>[] streams3 = stream3.branch(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return (value % 2) == 0;
-                    }
-                },
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) {
-                        return true;
-                    }
-                }
+            (key, value) -> (value % 2) == 0,
+            (key, value) -> true
         );
 
         final int anyWindowSize = 1;
         final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(),
Serdes.Integer(), Serdes.Integer());
-        final KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new
ValueJoiner<Integer, Integer, Integer>() {
-            @Override
-            public Integer apply(final Integer value1, final Integer value2) {
-                return value1 + value2;
-            }
-        }, JoinWindows.of(ofMillis(anyWindowSize)), joined);
+        final KStream<String, Integer> stream4 = streams2[0].join(streams3[0],
+            (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(anyWindowSize)),
joined);
 
-        streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>()
{
-            @Override
-            public Integer apply(final Integer value1, final Integer value2) {
-                return value1 + value2;
-            }
-        }, JoinWindows.of(ofMillis(anyWindowSize)), joined);
+        streams2[1].join(streams3[1], (value1, value2) -> value1 + value2,
+            JoinWindows.of(ofMillis(anyWindowSize)), joined);
 
         stream4.to("topic-5");
 


Mime
View raw message