kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boy...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove Diamond and code code Alignment (#8107)
Date Fri, 26 Jun 2020 06:24:31 GMT
This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43f4b1b  MINOR: Remove Diamond and code code Alignment (#8107)
43f4b1b is described below

commit 43f4b1b7b57c22b1f852afa256088b8b98f0392d
Author: high.lee <yello1109@daum.net>
AuthorDate: Fri Jun 26 15:22:58 2020 +0900

    MINOR: Remove Diamond and code code Alignment (#8107)
    
    Minor cleanup on streams internal classes, with diamond class removal and long function
signature breakdown.
    
    Reviewers: Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
---
 .../kafka/streams/kstream/internals/KStreamAggregate.java     |  4 +++-
 .../kafka/streams/kstream/internals/KStreamKStreamJoin.java   |  6 +++++-
 .../kafka/streams/kstream/internals/KStreamKTableJoin.java    | 11 ++++-------
 .../org/apache/kafka/streams/state/internals/ThreadCache.java |  4 ++--
 4 files changed, 14 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 3c3bdd0..359507b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -39,7 +39,9 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
     private boolean sendOldValues = false;
 
-    KStreamAggregate(final String storeName, final Initializer<T> initializer, final
Aggregator<? super K, ? super V, T> aggregator) {
+    KStreamAggregate(final String storeName,
+                     final Initializer<T> initializer,
+                     final Aggregator<? super K, ? super V, T> aggregator) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.aggregator = aggregator;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d57078e..8412037 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -42,7 +42,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K,
V1> {
     private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
 
-    KStreamKStreamJoin(final String otherWindowName, final long joinBeforeMs, final long
joinAfterMs, final ValueJoiner<? super V1, ? super V2, ? extends R> joiner, final boolean
outer) {
+    KStreamKStreamJoin(final String otherWindowName,
+                       final long joinBeforeMs,
+                       final long joinAfterMs,
+                       final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
+                       final boolean outer) {
         this.otherWindowName = otherWindowName;
         this.joinBeforeMs = joinBeforeMs;
         this.joinAfterMs = joinAfterMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index ee926bc..5631175 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -23,17 +23,14 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
-    private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K,
V1, K>() {
-        @Override
-        public K apply(final K key, final V1 value) {
-            return key;
-        }
-    };
+    private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
     private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
     private final ValueJoiner<? super V1, ? super V2, R> joiner;
     private final boolean leftJoin;
 
-    KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier, final
ValueJoiner<? super V1, ? super V2, R> joiner, final boolean leftJoin) {
+    KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
+                      final ValueJoiner<? super V1, ? super V2, R> joiner,
+                      final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 2899573..0179536 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -180,7 +180,7 @@ public class ThreadCache {
     public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final
Bytes to) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(),
new NamedCache(namespace, this.metrics));
+            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace,
this.metrics));
         }
         return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
     }
@@ -188,7 +188,7 @@ public class ThreadCache {
     public MemoryLRUCacheBytesIterator all(final String namespace) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(),
new NamedCache(namespace, this.metrics));
+            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace,
this.metrics));
         }
         return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
     }


Mime
View raw message