kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: KAFKA-4490: Add Global Table support to Kafka Streams
Date Thu, 12 Jan 2017 19:46:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c063172c2 -> 8079c980a


http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
index 4861e7c..870a397 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 
 public class MockKeyValueMapper {
 
+
+
     private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V,
KeyValue<K, V>> {
         @Override
         public KeyValue<K, V> apply(K key, V value) {
@@ -43,6 +45,18 @@ public class MockKeyValueMapper {
         }
     }
 
+    private static class SelectKeyMapper<K, V> implements KeyValueMapper<K, V, K>
{
+        @Override
+        public K apply(K key, V value) {
+            return key;
+        }
+    }
+
+    public static <K, V> KeyValueMapper<K, V, K> SelectKeyKeyValueMapper() {
+        return new SelectKeyMapper<>();
+    }
+
+
     public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper()
{
         return new NoOpKeyValueMapper<>();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 9ec2dfd..5ae7112 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -146,6 +146,11 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
     }
 
     @Override
+    public void initialized() {
+
+    }
+
+    @Override
     public File stateDir() {
         if (stateDir == null)
             throw new UnsupportedOperationException("State directory not specified");

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index cf8a526..d4c8334 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Collections;
@@ -30,6 +31,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V>
{
     public int numReceived = 0;
 
     public final MockProcessorSupplier<K, V> supplier;
+    public boolean initialized;
 
     public MockProcessorNode(long scheduleInterval) {
         this(new MockProcessorSupplier<K, V>(scheduleInterval));
@@ -42,6 +44,12 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K,
V> {
     }
 
     @Override
+    public void init(final ProcessorContext context) {
+        super.init(context);
+        initialized = true;
+    }
+
+    @Override
     public void process(K key, V value) {
         this.numReceived++;
         processor().process(key, value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 176501a..096c64a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
 
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
@@ -32,6 +33,7 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
     public int numReceived = 0;
     public final ArrayList<K> keys = new ArrayList<>();
     public final ArrayList<V> values = new ArrayList<>();
+    public boolean initialized;
 
     public MockSourceNode(String[] topics, Deserializer<K> keyDeserializer, Deserializer<V>
valDeserializer) {
         super(NAME + INDEX.getAndIncrement(), topics, keyDeserializer, valDeserializer);
@@ -43,4 +45,10 @@ public class MockSourceNode<K, V> extends SourceNode<K, V>
{
         this.keys.add(key);
         this.values.add(value);
     }
+
+    @Override
+    public void init(final ProcessorContext context) {
+        super.init(context);
+        initialized = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
new file mode 100644
index 0000000..2f94878
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class NoOpProcessorContext extends AbstractProcessorContext {
+    public boolean initialized;
+    public Map forwardedValues = new HashMap();
+
+    public NoOpProcessorContext() {
+        super(new TaskId(1, 1), "appId", streamsConfig(), new MockStreamsMetrics(new Metrics()),
null, null);
+    }
+
+    static StreamsConfig streamsConfig() {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "boot");
+        return new StreamsConfig(props);
+    }
+
+    @Override
+    public StateStore getStateStore(final String name) {
+        return null;
+    }
+
+    @Override
+    public void schedule(final long interval) {
+
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        forwardedValues.put(key, value);
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        forward(key, value);
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final String childName)
{
+        forward(key, value);
+    }
+
+    @Override
+    public void commit() {
+    }
+
+    @Override
+    public void initialized() {
+        initialized = true;
+    }
+
+    @Override
+    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
stateRestoreCallback) {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
new file mode 100644
index 0000000..b7fc981
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+public class NoOpReadOnlyStore<K, V>
+        implements ReadOnlyKeyValueStore<K, V>, StateStore {
+
+    private final String name;
+    private boolean open = true;
+    public boolean initialized;
+    public boolean flushed;
+
+
+    public NoOpReadOnlyStore() {
+        this("");
+    }
+
+    public NoOpReadOnlyStore(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public V get(final K key) {
+        return null;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        return null;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return null;
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return 0L;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.initialized = true;
+    }
+
+    @Override
+    public void flush() {
+        flushed = true;
+    }
+
+    @Override
+    public void close() {
+        open = false;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 3cf0624..0850b60 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -43,6 +43,9 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -55,6 +58,9 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+
+import java.io.IOException;
+
 /**
  * This class makes it easier to write tests to verify the behavior of topologies created
with a {@link TopologyBuilder}.
  * You can test simple topologies that have a single processor, or very complex topologies
that have multiple sources, processors,
@@ -137,13 +143,16 @@ public class ProcessorTopologyTestDriver {
 
     private final TaskId id;
     private final ProcessorTopology topology;
-    private final StreamTask task;
     private final MockConsumer<byte[], byte[]> consumer;
     private final MockProducer<byte[], byte[]> producer;
     private final MockConsumer<byte[], byte[]> restoreStateConsumer;
     private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
     private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic
= new HashMap<>();
+    private final ProcessorTopology globalTopology;
+    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
+    private StreamTask task;
+    private GlobalStateUpdateTask globalStateTask;
 
     /**
      * Create a new test driver instance.
@@ -154,6 +163,7 @@ public class ProcessorTopologyTestDriver {
     public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String...
storeNames) {
         id = new TaskId(0, 0);
         topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null);
+        globalTopology  = builder.buildGlobalStateTopology();
 
         // Set up the consumer and producer ...
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -171,20 +181,46 @@ public class ProcessorTopologyTestDriver {
             partitionsByTopic.put(topic, tp);
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
+
+
+
         consumer.assign(offsetsByTopicPartition.keySet());
-        StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        task = new StreamTask(id,
-            applicationId,
-            partitionsByTopic.values(),
-            topology,
-            consumer,
-            restoreStateConsumer,
-            config,
-            streamsMetrics,
-            new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()),
-            new ThreadCache("testCache", 1024 * 1024, streamsMetrics),
-            new MockTime(),
-            new RecordCollectorImpl(producer, "id"));
+
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath());
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);
+
+        if (globalTopology != null) {
+            final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
+            for (final String topicName : globalTopology.sourceTopics()) {
+                List<PartitionInfo> partitionInfos = new ArrayList<>();
+                partitionInfos.add(new PartitionInfo(topicName , 1, null, null, null));
+                globalConsumer.updatePartitions(topicName, partitionInfos);
+                final TopicPartition partition = new TopicPartition(topicName, 1);
+                globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
+                globalPartitionsByTopic.put(topicName, partition);
+                offsetsByTopicPartition.put(partition, new AtomicLong());
+            }
+            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology,
globalConsumer, stateDirectory);
+            globalStateTask = new GlobalStateUpdateTask(globalTopology,
+                                                        new GlobalProcessorContextImpl(config,
stateManager, streamsMetrics, cache),
+                                                        stateManager);
+            globalStateTask.initialize();
+        }
+
+        if (!partitionsByTopic.isEmpty()) {
+            task = new StreamTask(id,
+                                  applicationId,
+                                  partitionsByTopic.values(),
+                                  topology,
+                                  consumer,
+                                  restoreStateConsumer,
+                                  config,
+                                  streamsMetrics, stateDirectory,
+                                  cache,
+                                  new MockTime(),
+                                  new RecordCollectorImpl(producer, "id"));
+        }
     }
 
     /**
@@ -196,26 +232,35 @@ public class ProcessorTopologyTestDriver {
      */
     public void process(String topicName, byte[] key, byte[] value) {
         TopicPartition tp = partitionsByTopic.get(topicName);
-        if (tp == null) {
-            throw new IllegalArgumentException("Unexpected topic: " + topicName);
-        }
-        // Add the record ...
-        long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
-        task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(),
tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
-        producer.clear();
-        // Process the record ...
-        task.process();
-        ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(0L,
offset, tp.partition(), topicName));
-        task.commit();
-        // Capture all the records sent to the producer ...
-        for (ProducerRecord<byte[], byte[]> record : producer.history()) {
-            Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
-            if (outputRecords == null) {
-                outputRecords = new LinkedList<>();
-                outputRecordsByTopic.put(record.topic(), outputRecords);
+        if (tp != null) {
+            // Add the record ...
+            long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
+            task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(),
offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
+            producer.clear();
+            // Process the record ...
+            task.process();
+            ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(0L,
offset, tp.partition(), topicName));
+            task.commit();
+            // Capture all the records sent to the producer ...
+            for (ProducerRecord<byte[], byte[]> record : producer.history()) {
+                Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
+                if (outputRecords == null) {
+                    outputRecords = new LinkedList<>();
+                    outputRecordsByTopic.put(record.topic(), outputRecords);
+                }
+                outputRecords.add(record);
+            }
+        } else {
+            final TopicPartition global = globalPartitionsByTopic.get(topicName);
+            if (global == null) {
+                throw new IllegalArgumentException("Unexpected topic: " + topicName);
             }
-            outputRecords.add(record);
+            final long offset = offsetsByTopicPartition.get(global).incrementAndGet();
+            globalStateTask.update(new ConsumerRecord<>(global.topic(), global.partition(),
offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value));
+            globalStateTask.flushState();
         }
+
+
     }
 
     /**
@@ -304,7 +349,16 @@ public class ProcessorTopologyTestDriver {
      * Close the driver, its topology, and all processors.
      */
     public void close() {
-        task.close();
+        if (task != null) {
+            task.close();
+        }
+        if (globalStateTask != null) {
+            try {
+                globalStateTask.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
     }
 
     /**
@@ -346,4 +400,26 @@ public class ProcessorTopologyTestDriver {
         }
         return consumer;
     }
+
+    protected MockConsumer<byte[], byte[]> createGlobalConsumer() {
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST)
{
+            @Override
+            public synchronized void seekToEnd(Collection<TopicPartition> partitions)
{
+                // do nothing ...
+            }
+
+            @Override
+            public synchronized void seekToBeginning(Collection<TopicPartition> partitions)
{
+                // do nothing ...
+            }
+
+            @Override
+            public synchronized long position(TopicPartition partition) {
+                // do nothing ...
+                return 0L;
+            }
+        };
+
+        return consumer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 2c8a55e..555e622 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -16,8 +16,12 @@ package org.apache.kafka.test;
 
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -53,4 +57,13 @@ public class StreamsTestUtils {
                 new Properties());
     }
 
+
+    public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K,
V>> iterator) {
+        final List<KeyValue<K, V>> results = new ArrayList<>();
+
+        while (iterator.hasNext()) {
+            results.add(iterator.next());
+        }
+        return results;
+    }
 }


Mime
View raw message