kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2600: Align Kafka Streams' interfaces with Java 8 functional interfaces
Date Fri, 09 Oct 2015 19:14:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 49822ff83 -> 7233858be


http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1abb989..50a23ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -31,13 +31,13 @@ import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.junit.After;
 import org.junit.Before;
@@ -115,8 +115,8 @@ public class ProcessorTopologyTest {
 
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2", "topic-3");
-        builder.addProcessor("processor-1", new MockProcessorDef(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorDef(), "source-1", "source-2");
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-1", "source-2");
         builder.addSink("sink-1", "topic-3", "processor-1");
         builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
@@ -308,10 +308,10 @@ public class ProcessorTopologyTest {
         }
     }
 
-    protected ProcessorDef define(final Processor processor) {
-        return new ProcessorDef() {
+    protected ProcessorSupplier define(final Processor processor) {
+        return new ProcessorSupplier() {
             @Override
-            public Processor instance() {
+            public Processor get() {
                 return processor;
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java
deleted file mode 100644
index 918b468..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorDef;
-
-import java.util.ArrayList;
-
-public class MockProcessorDef<K, V> implements ProcessorDef {
-
-    public final ArrayList<String> processed = new ArrayList<>();
-    public final ArrayList<Long> punctuated = new ArrayList<>();
-
-    public Processor instance() {
-        return new MockProcessor();
-    }
-
-    public class MockProcessor implements Processor<K, V> {
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do nothing
-        }
-
-        @Override
-        public void process(K key, V value) {
-            processed.add(key + ":" + value);
-        }
-
-        @Override
-        public void punctuate(long streamTime) {
-            punctuated.add(streamTime);
-        }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
new file mode 100644
index 0000000..f1aa167
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.ArrayList;
+
+public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
+
+    public final ArrayList<String> processed = new ArrayList<>();
+    public final ArrayList<Long> punctuated = new ArrayList<>();
+
+    @Override
+    public Processor<K, V> get() {
+        return new MockProcessor();
+    }
+
+    public class MockProcessor implements Processor<K, V> {
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do nothing
+        }
+
+        @Override
+        public void process(K key, V value) {
+            processed.add(key + ":" + value);
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            punctuated.add(streamTime);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7233858b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
index b5a3b3c..04c8f61 100644
--- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
+++ b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
@@ -20,14 +20,14 @@ package org.apache.kafka.test;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.kstream.WindowSupplier;
 import org.apache.kafka.streams.kstream.internals.FilteredIterator;
 import org.apache.kafka.streams.processor.internals.Stamped;
 
 import java.util.Iterator;
 import java.util.LinkedList;
 
-public class UnlimitedWindowDef<K, V> implements WindowDef<K, V> {
+public class UnlimitedWindowDef<K, V> implements WindowSupplier<K, V> {
 
     private final String name;
 
@@ -39,7 +39,7 @@ public class UnlimitedWindowDef<K, V> implements WindowDef<K, V>
{
         return name;
     }
 
-    public Window<K, V> instance() {
+    public Window<K, V> get() {
         return new UnlimitedWindow();
     }
 


Mime
View raw message