kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2707: make KStream processor names deterministic
Date Mon, 02 Nov 2015 22:35:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1f5d05fe7 -> e466ccd71


KAFKA-2707: make KStream processor names deterministic

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #408 from ymatsuda/kstream_processor_name


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e466ccd7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e466ccd7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e466ccd7

Branch: refs/heads/trunk
Commit: e466ccd711ae00c5bb046c18aacf353b1a460dcd
Parents: 1f5d05f
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Nov 2 14:40:52 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 2 14:40:52 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 11 +++++-
 .../streams/kstream/internals/KStreamImpl.java  | 41 +++++++++-----------
 .../kstream/internals/KStreamWindowedImpl.java  | 10 ++---
 .../streams/kstream/KStreamBuilderTest.java     | 19 ++++++++-
 4 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e466ccd7/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 5b3feb6..c8a8bd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -22,12 +22,15 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * KStreamBuilder is the class to create KStream instances.
  */
 public class KStreamBuilder extends TopologyBuilder {
 
+    private final AtomicInteger index = new AtomicInteger(0);
+
     public KStreamBuilder() {
         super();
     }
@@ -40,7 +43,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return KStream
      */
     public <K, V> KStream<K, V> from(String... topics) {
-        String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
+        String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(name, topics);
 
@@ -58,10 +61,14 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return KStream
      */
     public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer,
Deserializer<? extends V> valDeserializer, String... topics) {
-        String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
+        String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(name, keyDeserializer, valDeserializer, topics);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name));
     }
+
+    public String newName(String prefix) {
+        return prefix + String.format("%010d", index.getAndIncrement());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e466ccd7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1ea9b1e..0986405 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -29,12 +30,10 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.lang.reflect.Array;
 import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class KStreamImpl<K, V> implements KStream<K, V> {
 
@@ -70,13 +69,11 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     public static final String SOURCE_NAME = "KAFKA-SOURCE-";
 
-    public static final AtomicInteger INDEX = new AtomicInteger(1);
-
-    protected final TopologyBuilder topology;
+    protected final KStreamBuilder topology;
     protected final String name;
     protected final Set<String> sourceNodes;
 
-    public KStreamImpl(TopologyBuilder topology, String name, Set<String> sourceNodes)
{
+    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes)
{
         this.topology = topology;
         this.name = name;
         this.sourceNodes = sourceNodes;
@@ -84,7 +81,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public KStream<K, V> filter(Predicate<K, V> predicate) {
-        String name = FILTER_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(FILTER_NAME);
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
 
@@ -93,7 +90,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
-        String name = FILTER_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(FILTER_NAME);
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
 
@@ -102,7 +99,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1,
V1>> mapper) {
-        String name = MAP_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(MAP_NAME);
 
         topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
@@ -111,7 +108,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
-        String name = MAPVALUES_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(MAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
@@ -120,7 +117,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1,
V1>>> mapper) {
-        String name = FLATMAP_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(FLATMAP_NAME);
 
         topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
@@ -129,7 +126,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>>
mapper) {
-        String name = FLATMAPVALUES_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(FLATMAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
 
@@ -138,7 +135,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
-        String name = WINDOWED_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(WINDOWED_NAME);
 
         topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
 
@@ -148,13 +145,13 @@ public class KStreamImpl<K, V> implements KStream<K, V>
{
     @Override
     @SuppressWarnings("unchecked")
     public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
-        String branchName = BRANCH_NAME + INDEX.getAndIncrement();
+        String branchName = topology.newName(BRANCH_NAME);
 
         topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
 
         KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class,
predicates.length);
         for (int i = 0; i < predicates.length; i++) {
-            String childName = BRANCHCHILD_NAME + INDEX.getAndIncrement();
+            String childName = topology.newName(BRANCHCHILD_NAME);
 
             topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
 
@@ -170,11 +167,11 @@ public class KStreamImpl<K, V> implements KStream<K, V>
{
                                             Serializer<V> valSerializer,
                                             Deserializer<K1> keyDeserializer,
                                             Deserializer<V1> valDeserializer) {
-        String sendName = SINK_NAME + INDEX.getAndIncrement();
+        String sendName = topology.newName(SINK_NAME);
 
         topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
 
-        String sourceName = SOURCE_NAME + INDEX.getAndIncrement();
+        String sourceName = topology.newName(SOURCE_NAME);
 
         topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
 
@@ -188,21 +185,21 @@ public class KStreamImpl<K, V> implements KStream<K, V>
{
 
     @Override
     public void to(String topic) {
-        String name = SINK_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(SINK_NAME);
 
         topology.addSink(name, topic, this.name);
     }
 
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer)
{
-        String name = SINK_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(SINK_NAME);
 
         topology.addSink(name, topic, keySerializer, valSerializer, this.name);
     }
 
     @Override
     public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1,
V1>> transformerSupplier, String... stateStoreNames) {
-        String name = TRANSFORM_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(TRANSFORM_NAME);
 
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
@@ -212,7 +209,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V,
V1> valueTransformerSupplier, String... stateStoreNames) {
-        String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(TRANSFORMVALUES_NAME);
 
         topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier),
this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
@@ -222,7 +219,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     @Override
     public void process(final ProcessorSupplier<K, V> processorSupplier, String...
stateStoreNames) {
-        String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+        String name = topology.newName(PROCESSOR_NAME);
 
         topology.addProcessor(name, processorSupplier, this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e466ccd7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
index 4e9f4c6..cb49873 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStreamWindowed;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -31,7 +31,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K,
V> implement
 
     private final WindowSupplier<K, V> windowSupplier;
 
-    public KStreamWindowedImpl(TopologyBuilder topology, String name, Set<String> sourceNodes,
WindowSupplier<K, V> windowSupplier) {
+    public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes,
WindowSupplier<K, V> windowSupplier) {
         super(topology, name, sourceNodes);
         this.windowSupplier = windowSupplier;
     }
@@ -53,9 +53,9 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K,
V> implement
         KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName,
KStreamJoin.reverseJoiner(valueJoiner));
         KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
 
-        String joinThisName = JOINTHIS_NAME + INDEX.getAndIncrement();
-        String joinOtherName = JOINOTHER_NAME + INDEX.getAndIncrement();
-        String joinMergeName = JOINMERGE_NAME + INDEX.getAndIncrement();
+        String joinThisName = topology.newName(JOINTHIS_NAME);
+        String joinOtherName = topology.newName(JOINOTHER_NAME);
+        String joinMergeName = topology.newName(JOINMERGE_NAME);
 
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e466ccd7/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 49171e3..cf1cfaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.TopologyException;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class KStreamBuilderTest {
 
     @Test(expected = TopologyException.class)
@@ -29,6 +31,21 @@ public class KStreamBuilderTest {
 
         builder.from("topic-1", "topic-2");
 
-        builder.addSource(KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.decrementAndGet(),
"topic-3");
+        builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
+    }
+
+    @Test
+    public void testNewName() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        assertEquals("X-0000000000", builder.newName("X-"));
+        assertEquals("Y-0000000001", builder.newName("Y-"));
+        assertEquals("Z-0000000002", builder.newName("Z-"));
+
+        builder = new KStreamBuilder();
+
+        assertEquals("X-0000000000", builder.newName("X-"));
+        assertEquals("Y-0000000001", builder.newName("Y-"));
+        assertEquals("Z-0000000002", builder.newName("Z-"));
     }
 }


Mime
View raw message