kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: Java8 cleanup (#6598)
Date Sat, 20 Apr 2019 01:44:36 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3aa9095  MINOR: Java8 cleanup (#6598)
3aa9095 is described below

commit 3aa909575d3f978a5045200986eb692eebd47ab8
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Fri Apr 19 18:44:27 2019 -0700

    MINOR: Java8 cleanup (#6598)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 53 ++++++++++------------
 1 file changed, 23 insertions(+), 30 deletions(-)

diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 2e108b2..797c1e8 100644
--- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,17 +16,17 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.util.Properties;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
+import java.util.Properties;
+
 public class StreamsUpgradeTest {
 
 
@@ -59,40 +59,33 @@ public class StreamsUpgradeTest {
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
         streams.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                streams.close();
-                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+            System.out.flush();
+        }));
     }
 
     private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
-        return new ProcessorSupplier<K, V>() {
-            public Processor<K, V> get() {
-                return new AbstractProcessor<K, V>() {
-                    private int numRecordsProcessed = 0;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        System.out.println("[2.1] initializing processor: topic=data taskId="
+ context.taskId());
-                        numRecordsProcessed = 0;
-                    }
+        return () -> new AbstractProcessor<K, V>() {
+            private int numRecordsProcessed = 0;
 
-                    @Override
-                    public void process(final K key, final V value) {
-                        numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.println("processed " + numRecordsProcessed + " records
from topic=data");
-                        }
-                    }
+            @Override
+            public void init(final ProcessorContext context) {
+                System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId());
+                numRecordsProcessed = 0;
+            }
 
-                    @Override
-                    public void close() {}
-                };
+            @Override
+            public void process(final K key, final V value) {
+                numRecordsProcessed++;
+                if (numRecordsProcessed % 100 == 0) {
+                    System.out.println("processed " + numRecordsProcessed + " records from
topic=data");
+                }
             }
+
+            @Override
+            public void close() {}
         };
     }
 }


Mime
View raw message