kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: HOTFIX: Add back the copy-constructor of abstract stream
Date Mon, 03 Jul 2017 08:34:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ab8e9d175 -> c9f777cc4


HOTFIX: Add back the copy-constructor of abstract stream

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3428 from guozhangwang/KHotfix-add-copy-constructor-abstract-stream


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9f777cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9f777cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9f777cc

Branch: refs/heads/trunk
Commit: c9f777cc44269c2e9537eab2451481422e613383
Parents: ab8e9d1
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Jul 3 09:34:16 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Jul 3 09:34:16 2017 +0100

----------------------------------------------------------------------
 .../kstream/internals/AbstractStream.java       |  8 ++
 .../kstream/internals/AbstractStreamTest.java   | 99 ++++++++++++++++++++
 2 files changed, 107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9f777cc/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 8aea44d..81d10ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -37,6 +37,14 @@ public abstract class AbstractStream<K> {
     protected final String name;
     protected final Set<String> sourceNodes;
 
+    // This copy-constructor will allow to extend KStream
+    // and KTable APIs with new methods without impacting the public interface.
+    public AbstractStream(AbstractStream<K> stream) {
+        this.topology = stream.topology;
+        this.name = stream.name;
+        this.sourceNodes = stream.sourceNodes;
+    }
+
     AbstractStream(final KStreamBuilder topology, String name, final Set<String> sourceNodes)
{
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must not be
null or empty");

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9f777cc/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
new file mode 100644
index 0000000..d482182
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Random;
+import static org.junit.Assert.assertTrue;
+
+public class AbstractStreamTest {
+
+    private final String topicName = "topic";
+
+    private KStreamTestDriver driver;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+    }
+
+    @Test
+    public void testShouldBeExtensible() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
+        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+
+        ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(Serdes.Integer(),
Serdes.String(), topicName));
+
+        stream.randomFilter().process(processor);
+
+        driver = new KStreamTestDriver(builder);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, "V" + expectedKey);
+        }
+
+        assertTrue(processor.processed.size() <= expectedKeys.length);
+    }
+
+    private class ExtendedKStream<K, V> extends AbstractStream<K> {
+
+        ExtendedKStream(KStream<K, V> stream) {
+            super((KStreamImpl<K, V>) stream);
+        }
+
+        KStream<K, V> randomFilter() {
+            String name = this.topology.newName("RANDOM-FILTER-");
+            this.topology.addProcessor(name, new ExtendedKStreamDummy(), this.name);
+            return new KStreamImpl<>(topology, name, sourceNodes, false);
+        }
+    }
+
+    private class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V>
{
+
+        private Random rand;
+
+        ExtendedKStreamDummy() {
+            rand = new Random();
+        }
+
+        @Override
+        public Processor<K, V> get() {
+            return new ExtendedKStreamDummyProcessor();
+        }
+
+        private class ExtendedKStreamDummyProcessor extends AbstractProcessor<K, V>
{
+            @Override
+            public void process(K key, V value) {
+                // flip a coin and filter
+                if (rand.nextBoolean())
+                    context().forward(key, value);
+            }
+        }
+    }
+}


Mime
View raw message