kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/6] kafka git commit: KIP-28: Add a processor client for Kafka Streaming
Date Sat, 26 Sep 2015 00:24:22 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
new file mode 100644
index 0000000..e9aaa20
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
+
+    public InMemoryKeyValueStore(String name, ProcessorContext context) {
+        this(name, context, new SystemTime());
+    }
+
+    public InMemoryKeyValueStore(String name, ProcessorContext context, Time time) {
+        super(name, new MemoryStore<K, V>(name, context), context, "kafka-streams", time);
+    }
+
+    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final NavigableMap<K, V> map;
+        private final ProcessorContext context;
+
+        @SuppressWarnings("unchecked")
+        public MemoryStore(String name, ProcessorContext context) {
+            super();
+            this.name = name;
+            this.map = new TreeMap<>();
+            this.context = context;
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            return this.map.remove(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        public void restore() {
+            // this should not happen since it is in-memory, hence no state to load from disk
+            throw new IllegalStateException("This should not happen");
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<Map.Entry<K, V>> iter;
+
+            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+                this.iter = iter;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                Map.Entry<K, V> entry = iter.next();
+                return new Entry<>(entry.getKey(), entry.getValue());
+            }
+
+            @Override
+            public void remove() {
+                iter.remove();
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
new file mode 100644
index 0000000..0fbd4ae
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable {
+
+    @Override
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
new file mode 100644
index 0000000..e4faed1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.List;
+
+/**
+ * A key-value store that supports put/get/delete and range queries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public interface KeyValueStore<K, V> extends StateStore {
+
+    /**
+     * Get the value corresponding to this key
+     *
+     * @param key The key to fetch
+     * @return The value or null if no value is found.
+     * @throws NullPointerException If null is used for key.
+     */
+    abstract public V get(K key);
+
+    /**
+     * Update the value associated with this key
+     *
+     * @param key They key to associate the value to
+     * @param value The value
+     * @throws NullPointerException If null is used for key or value.
+     */
+    abstract public void put(K key, V value);
+
+    /**
+     * Update all the given key/value pairs
+     *
+     * @param entries A list of entries to put into the store.
+     * @throws NullPointerException If null is used for any key or value.
+     */
+    abstract public void putAll(List<Entry<K, V>> entries);
+
+    /**
+     * Delete the value from the store (if there is one)
+     *
+     * @param key The key
+     * @return The old value or null if there is no such key.
+     * @throws NullPointerException If null is used for key.
+     */
+    abstract public V delete(K key);
+
+    /**
+     * Get an iterator over a given range of keys. This iterator MUST be closed after use.
+     *
+     * @param from The first key that could be in the range
+     * @param to The last key that could be in the range
+     * @return The iterator for this range.
+     * @throws NullPointerException If null is used for from or to.
+     */
+    abstract public KeyValueIterator<K, V> range(K from, K to);
+
+    /**
+     * Return an iterator over all keys in the database. This iterator MUST be closed after use.
+     *
+     * @return An iterator of all key/value pairs in the store.
+     */
+    abstract public KeyValueIterator<K, V> all();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
new file mode 100644
index 0000000..018f1c6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
+
+    protected final KeyValueStore<K, V> inner;
+
+    private final Time time;
+    private final String group;
+    private final Sensor putTime;
+    private final Sensor getTime;
+    private final Sensor deleteTime;
+    private final Sensor putAllTime;
+    private final Sensor allTime;
+    private final Sensor rangeTime;
+    private final Sensor flushTime;
+    private final Sensor restoreTime;
+    private final Metrics metrics;
+
+    private final String topic;
+    private final int partition;
+    private final Set<K> dirty;
+    private final int maxDirty;
+    private final ProcessorContext context;
+
+    // always wrap the logged store with the metered store
+    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String group, Time time) {
+        this.inner = inner;
+
+        this.time = time;
+        this.group = group;
+        this.metrics = context.metrics();
+        this.putTime = createSensor(name, "put");
+        this.getTime = createSensor(name, "get");
+        this.deleteTime = createSensor(name, "delete");
+        this.putAllTime = createSensor(name, "put-all");
+        this.allTime = createSensor(name, "all");
+        this.rangeTime = createSensor(name, "range");
+        this.flushTime = createSensor(name, "flush");
+        this.restoreTime = createSensor(name, "restore");
+
+        this.topic = name;
+        this.partition = context.id();
+
+        this.context = context;
+
+        this.dirty = new HashSet<K>();
+        this.maxDirty = 100;        // TODO: this needs to be configurable
+
+        // register and possibly restore the state from the logs
+        long startNs = time.nanoseconds();
+        try {
+            final Deserializer<K> keyDeserializer = (Deserializer<K>) context.keyDeserializer();
+            final Deserializer<V> valDeserializer = (Deserializer<V>) context.valueDeserializer();
+
+            context.register(this, new RestoreFunc() {
+                @Override
+                public void apply(byte[] key, byte[] value) {
+                    inner.put(keyDeserializer.deserialize(topic, key),
+                        valDeserializer.deserialize(topic, value));
+                }
+            });
+        } finally {
+            recordLatency(this.restoreTime, startNs, time.nanoseconds());
+        }
+    }
+
+    private Sensor createSensor(String storeName, String operation) {
+        Sensor parent = metrics.sensor(operation);
+        addLatencyMetrics(parent, operation);
+        Sensor sensor = metrics.sensor(storeName + "- " + operation, parent);
+        addLatencyMetrics(sensor, operation, "store-name", storeName);
+        return sensor;
+    }
+
+    private void addLatencyMetrics(Sensor sensor, String opName, String... kvs) {
+        maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", group, "The average latency in milliseconds of the key-value store operation.", kvs), new Avg());
+        maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", group, "The max latency in milliseconds of the key-value store operation.", kvs), new Max());
+        maybeAddMetric(sensor, new MetricName(opName + "-qps", group, "The average number of occurance of the given key-value store operation per second.", kvs), new Rate(new Count()));
+    }
+
+    private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
+        if (!metrics.metrics().containsKey(name))
+            sensor.add(name, stat);
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public V get(K key) {
+        long startNs = time.nanoseconds();
+        try {
+            return this.inner.get(key);
+        } finally {
+            recordLatency(this.getTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.put(key, value);
+
+            this.dirty.add(key);
+            if (this.dirty.size() > this.maxDirty)
+                logChange();
+        } finally {
+            recordLatency(this.putTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void putAll(List<Entry<K, V>> entries) {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.putAll(entries);
+
+            for (Entry<K, V> entry : entries) {
+                this.dirty.add(entry.key());
+            }
+
+            if (this.dirty.size() > this.maxDirty)
+                logChange();
+        } finally {
+            recordLatency(this.putAllTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public V delete(K key) {
+        long startNs = time.nanoseconds();
+        try {
+            V value = this.inner.delete(key);
+
+            this.dirty.add(key);
+            if (this.dirty.size() > this.maxDirty)
+                logChange();
+
+            return value;
+        } finally {
+            recordLatency(this.deleteTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public void flush() {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.flush();
+            logChange();
+        } finally {
+            recordLatency(this.flushTime, startNs, time.nanoseconds());
+        }
+    }
+
+    private void logChange() {
+        RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+        Serializer<K> keySerializer = (Serializer<K>) context.keySerializer();
+        Serializer<V> valueSerializer = (Serializer<V>) context.valueSerializer();
+
+        if (collector != null) {
+            for (K k : this.dirty) {
+                V v = this.inner.get(k);
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+            }
+            this.dirty.clear();
+        }
+    }
+
+    private void recordLatency(Sensor sensor, long startNs, long endNs) {
+        sensor.record((endNs - startNs) / 1000000, endNs);
+    }
+
+    private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
+
+        private final KeyValueIterator<K1, V1> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public Entry<K1, V1> next() {
+            return iter.next();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
new file mode 100644
index 0000000..e04de68
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class saves out a map of topic/partition=&gt;offsets to a file. The format of the file is UTF-8 text containing the following:
+ * <pre>
+ *   &lt;version&gt;
+ *   &lt;n&gt;
+ *   &lt;topic_name_1&gt; &lt;partition_1&gt; &lt;offset_1&gt;
+ *   .
+ *   .
+ *   .
+ *   &lt;topic_name_n&gt; &lt;partition_n&gt; &lt;offset_n&gt;
+ * </pre>
+ *   The first line contains a number designating the format version (currently 0), the get line contains
+ *   a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
+ *   separated by spaces.
+ */
+public class OffsetCheckpoint {
+
+    private static final int VERSION = 0;
+
+    private final File file;
+    private final Object lock;
+
+    public OffsetCheckpoint(File file) throws IOException {
+        new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness
+        this.file = file;
+        this.lock = new Object();
+    }
+
+    public void write(Map<TopicPartition, Long> offsets) throws IOException {
+        synchronized (lock) {
+            // write to temp file and then swap with the existing file
+            File temp = new File(file.getAbsolutePath() + ".tmp");
+
+            FileOutputStream fileOutputStream = new FileOutputStream(temp);
+            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
+            try {
+                writeIntLine(writer, VERSION);
+                writeIntLine(writer, offsets.size());
+
+                // write the entries
+                for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+                    writeEntry(writer, entry.getKey(), entry.getValue());
+
+                // flush the buffer and then fsync the underlying file
+                writer.flush();
+                fileOutputStream.getFD().sync();
+            } finally {
+                writer.close();
+            }
+
+            // swap new offset checkpoint file with previous one
+            if (!temp.renameTo(file)) {
+                // renameTo() fails on Windows if the destination file exists.
+                file.delete();
+                if (!temp.renameTo(file))
+                    throw new IOException(String.format("File rename from %s to %s failed.",
+                        temp.getAbsolutePath(),
+                        file.getAbsolutePath()));
+            }
+        }
+    }
+
+    private void writeIntLine(BufferedWriter writer, int number) throws IOException {
+        writer.write(Integer.toString(number));
+        writer.newLine();
+    }
+
+    private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
+        writer.write(part.topic());
+        writer.write(' ');
+        writer.write(Integer.toString(part.partition()));
+        writer.write(' ');
+        writer.write(Long.toString(offset));
+        writer.newLine();
+    }
+
+    public Map<TopicPartition, Long> read() throws IOException {
+        synchronized (lock) {
+            BufferedReader reader = null;
+            try {
+                reader = new BufferedReader(new FileReader(file));
+            } catch (FileNotFoundException e) {
+                return Collections.emptyMap();
+            }
+
+            try {
+                int version = readInt(reader);
+                switch (version) {
+                    case 0:
+                        int expectedSize = readInt(reader);
+                        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
+                        String line = reader.readLine();
+                        while (line != null) {
+                            String[] pieces = line.split("\\s+");
+                            if (pieces.length != 3)
+                                throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
+                                    line));
+
+                            String topic = pieces[0];
+                            int partition = Integer.parseInt(pieces[1]);
+                            long offset = Long.parseLong(pieces[2]);
+                            offsets.put(new TopicPartition(topic, partition), offset);
+                            line = reader.readLine();
+                        }
+                        if (offsets.size() != expectedSize)
+                            throw new IOException(String.format("Expected %d entries but found only %d",
+                                expectedSize,
+                                offsets.size()));
+                        return offsets;
+
+                    default:
+                        throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
+                }
+            } finally {
+                if (reader != null)
+                    reader.close();
+            }
+        }
+    }
+
+    private int readInt(BufferedReader reader) throws IOException {
+        String line = reader.readLine();
+        if (line == null)
+            throw new EOFException("File ended prematurely.");
+        int val = Integer.parseInt(line);
+        return val;
+    }
+
+    public void delete() throws IOException {
+        file.delete();
+    }
+
+    @Override
+    public String toString() {
+        return this.file.getAbsolutePath();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
new file mode 100644
index 0000000..e0962a2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.SystemTime;
+
+import org.apache.kafka.common.utils.Time;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
+
+    public RocksDBKeyValueStore(String name, ProcessorContext context) {
+        this(name, context, new SystemTime());
+    }
+
+    public RocksDBKeyValueStore(String name, ProcessorContext context, Time time) {
+        super(name, new RocksDBStore(name, context), context, "kafka-streams", time);
+    }
+
+    private static class RocksDBStore implements KeyValueStore<byte[], byte[]> {
+
+        private static final int TTL_NOT_USED = -1;
+
+        // TODO: these values should be configurable
+        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+        private static final long BLOCK_SIZE = 4096L;
+        private static final int TTL_SECONDS = TTL_NOT_USED;
+        private static final int MAX_WRITE_BUFFERS = 3;
+        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+        private static final String DB_FILE_DIR = "rocksdb";
+
+        private final String topic;
+        private final int partition;
+        private final ProcessorContext context;
+
+        private final Options options;
+        private final WriteOptions wOptions;
+        private final FlushOptions fOptions;
+
+        private final String dbName;
+        private final String dirName;
+
+        private RocksDB db;
+
+        @SuppressWarnings("unchecked")
+        public RocksDBStore(String name, ProcessorContext context) {
+            this.topic = name;
+            this.partition = context.id();
+            this.context = context;
+
+            // initialize the rocksdb options
+            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+            tableConfig.setBlockSize(BLOCK_SIZE);
+
+            options = new Options();
+            options.setTableFormatConfig(tableConfig);
+            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+            options.setCompressionType(COMPRESSION_TYPE);
+            options.setCompactionStyle(COMPACTION_STYLE);
+            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+            options.setCreateIfMissing(true);
+            options.setErrorIfExists(false);
+
+            wOptions = new WriteOptions();
+            wOptions.setDisableWAL(true);
+
+            fOptions = new FlushOptions();
+            fOptions.setWaitForFlush(true);
+
+            dbName = this.topic + "." + this.partition;
+            dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+
+            db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
+        }
+
+        private RocksDB openDB(File dir, Options options, int ttl) {
+            try {
+                if (ttl == TTL_NOT_USED) {
+                    return RocksDB.open(options, dir.toString());
+                } else {
+                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+                    // TODO: support TTL with change log?
+                    // return TtlDB.open(options, dir.toString(), ttl, false);
+                }
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+            }
+        }
+
+        @Override
+        public String name() {
+            return this.topic;
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public byte[] get(byte[] key) {
+            try {
+                return this.db.get(key);
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void put(byte[] key, byte[] value) {
+            try {
+                if (value == null) {
+                    db.remove(wOptions, key);
+                } else {
+                    db.put(wOptions, key, value);
+                }
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void putAll(List<Entry<byte[], byte[]>> entries) {
+            for (Entry<byte[], byte[]> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public byte[] delete(byte[] key) {
+            byte[] value = get(key);
+            put(key, null);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
+            return new RocksDBRangeIterator(db.newIterator(), from, to);
+        }
+
+        @Override
+        public KeyValueIterator<byte[], byte[]> all() {
+            RocksIterator innerIter = db.newIterator();
+            innerIter.seekToFirst();
+            return new RocksDbIterator(innerIter);
+        }
+
+        @Override
+        public void flush() {
+            try {
+                db.flush(fOptions);
+            } catch (RocksDBException e) {
+                // TODO: this needs to be handled more accurately
+                throw new KafkaException("Error while executing flush from store " + this.topic, e);
+            }
+        }
+
+        @Override
+        public void close() {
+            flush();
+            db.close();
+        }
+
+        private static class RocksDbIterator implements KeyValueIterator<byte[], byte[]> {
+            private final RocksIterator iter;
+
+            public RocksDbIterator(RocksIterator iter) {
+                this.iter = iter;
+            }
+
+            protected byte[] peekKey() {
+                return this.getEntry().key();
+            }
+
+            protected Entry<byte[], byte[]> getEntry() {
+                return new Entry<>(iter.key(), iter.value());
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.isValid();
+            }
+
+            @Override
+            public Entry<byte[], byte[]> next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                Entry<byte[], byte[]> entry = this.getEntry();
+                iter.next();
+
+                return entry;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+
+        private static class LexicographicComparator implements Comparator<byte[]> {
+
+            @Override
+            public int compare(byte[] left, byte[] right) {
+                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                    int leftByte = left[i] & 0xff;
+                    int rightByte = right[j] & 0xff;
+                    if (leftByte != rightByte) {
+                        return leftByte - rightByte;
+                    }
+                }
+                return left.length - right.length;
+            }
+        }
+
+        private static class RocksDBRangeIterator extends RocksDbIterator {
+            // RocksDB's JNI interface does not expose getters/setters that allow the
+            // comparator to be pluggable, and the default is lexicographic, so it's
+            // safe to just force lexicographic comparator here for now.
+            private final Comparator<byte[]> comparator = new LexicographicComparator();
+            byte[] to;
+
+            public RocksDBRangeIterator(RocksIterator iter, byte[] from, byte[] to) {
+                super(iter);
+                iter.seek(from);
+                this.to = to;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return super.hasNext() && comparator.compare(super.peekKey(), this.to) < 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
new file mode 100644
index 0000000..49171e3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.processor.TopologyException;
+import org.junit.Test;
+
+public class KStreamBuilderTest {
+
+    @Test(expected = TopologyException.class)
+    public void testFrom() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        builder.from("topic-1", "topic-2");
+
+        builder.addSource(KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.decrementAndGet(), "topic-3");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java
new file mode 100644
index 0000000..405c7c9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+public class FilteredIteratorTest {
+
+    @Test
+    public void testFiltering() {
+        List<Integer> list = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5);
+
+        Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) {
+            protected String filter(Integer i) {
+                if (i % 3 == 0) return i.toString();
+                return null;
+            }
+        };
+
+        List<String> expected = Arrays.asList("3", "9", "6", "3");
+        List<String> result = new ArrayList<String>();
+
+        while (filtered.hasNext()) {
+            result.add(filtered.next());
+        }
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void testEmptySource() {
+        List<Integer> list = new ArrayList<Integer>();
+
+        Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) {
+            protected String filter(Integer i) {
+                if (i % 3 == 0) return i.toString();
+                return null;
+            }
+        };
+
+        List<String> expected = new ArrayList<String>();
+        List<String> result = new ArrayList<String>();
+
+        while (filtered.hasNext()) {
+            result.add(filtered.next());
+        }
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void testNoMatch() {
+        List<Integer> list = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5);
+
+        Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) {
+            protected String filter(Integer i) {
+                if (i % 7 == 0) return i.toString();
+                return null;
+            }
+        };
+
+        List<String> expected = new ArrayList<String>();
+        List<String> result = new ArrayList<String>();
+
+        while (filtered.hasNext()) {
+            result.add(filtered.next());
+        }
+
+        assertEquals(expected, result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
new file mode 100644
index 0000000..c18ddfe
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+import org.junit.Test;
+
+import java.lang.reflect.Array;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamBranchTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testKStreamBranch() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        Predicate<Integer, String> isEven = new Predicate<Integer, String>() {
+            @Override
+            public boolean apply(Integer key, String value) {
+                return (key % 2) == 0;
+            }
+        };
+        Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
+            @Override
+            public boolean apply(Integer key, String value) {
+                return (key % 3) == 0;
+            }
+        };
+        Predicate<Integer, String> isOdd = new Predicate<Integer, String>() {
+            @Override
+            public boolean apply(Integer key, String value) {
+                return (key % 2) != 0;
+            }
+        };
+
+        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6};
+
+        KStream<Integer, String> stream;
+        KStream<Integer, String>[] branches;
+        MockProcessorDef<Integer, String>[] processors;
+
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        branches = stream.branch(isEven, isMultipleOfThree, isOdd);
+
+        assertEquals(3, branches.length);
+
+        processors = (MockProcessorDef<Integer, String>[]) Array.newInstance(MockProcessorDef.class, branches.length);
+        for (int i = 0; i < branches.length; i++) {
+            processors[i] = new MockProcessorDef<>();
+            branches[i].process(processors[i]);
+        }
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(3, processors[0].processed.size());
+        assertEquals(1, processors[1].processed.size());
+        assertEquals(2, processors[2].processed.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
new file mode 100644
index 0000000..b80e1e2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamFilterTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
+        @Override
+        public boolean apply(Integer key, String value) {
+            return (key % 3) == 0;
+        }
+    };
+
+    @Test
+    public void testFilter() {
+        KStreamBuilder builder = new KStreamBuilder();
+        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
+
+        KStream<Integer, String> stream;
+        MockProcessorDef<Integer, String> processor;
+
+        processor = new MockProcessorDef<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.filter(isMultipleOfThree).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(2, processor.processed.size());
+    }
+
+    @Test
+    public void testFilterOut() {
+        KStreamBuilder builder = new KStreamBuilder();
+        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
+
+        KStream<Integer, String> stream;
+        MockProcessorDef<Integer, String> processor;
+
+        processor = new MockProcessorDef<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.filterOut(isMultipleOfThree).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(5, processor.processed.size());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
new file mode 100644
index 0000000..e87223e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+public class KStreamFlatMapTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    @Test
+    public void testFlatMap() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>> mapper =
+            new KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>>() {
+                @Override
+                public Iterable<KeyValue<String, String>> apply(Integer key, String value) {
+                    ArrayList<KeyValue<String, String>> result = new ArrayList<>();
+                    for (int i = 0; i < key; i++) {
+                        result.add(KeyValue.pair(Integer.toString(key * 10 + i), value));
+                    }
+                    return result;
+                }
+            };
+
+        final int[] expectedKeys = {0, 1, 2, 3};
+
+        KStream<Integer, String> stream;
+        MockProcessorDef<String, String> processor;
+
+        processor = new MockProcessorDef<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.flatMap(mapper).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(6, processor.processed.size());
+
+        String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
new file mode 100644
index 0000000..09dda65
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamFlatMapValuesTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    @Test
+    public void testFlatMapValues() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        ValueMapper<String, Iterable<String>> mapper =
+            new ValueMapper<String, Iterable<String>>() {
+                @Override
+                public Iterable<String> apply(String value) {
+                    ArrayList<String> result = new ArrayList<String>();
+                    result.add(value.toLowerCase());
+                    result.add(value);
+                    return result;
+                }
+            };
+
+        final int[] expectedKeys = {0, 1, 2, 3};
+
+        KStream<Integer, String> stream;
+        MockProcessorDef<Integer, String> processor;
+
+        processor = new MockProcessorDef<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.flatMapValues(mapper).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(8, processor.processed.size());
+
+        String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
new file mode 100644
index 0000000..0660ddd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.UnlimitedWindowDef;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class KStreamImplTest {
+
+    @Test
+    public void testNumProcesses() {
+        final Deserializer<String> deserializer = new StringDeserializer();
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> source1 = builder.from(deserializer, deserializer, "topic-1", "topic-2");
+
+        KStream<String, String> source2 = builder.from(deserializer, deserializer, "topic-3", "topic-4");
+
+        KStream<String, String> stream1 =
+            source1.filter(new Predicate<String, String>() {
+                @Override
+                public boolean apply(String key, String value) {
+                    return true;
+                }
+            }).filterOut(new Predicate<String, String>() {
+                @Override
+                public boolean apply(String key, String value) {
+                    return false;
+                }
+            });
+
+        KStream<String, Integer> stream2 = stream1.mapValues(new ValueMapper<String, Integer>() {
+            @Override
+            public Integer apply(String value) {
+                return new Integer(value);
+            }
+        });
+
+        KStream<String, Integer> stream3 = source2.flatMapValues(new ValueMapper<String, Iterable<Integer>>() {
+            @Override
+            public Iterable<Integer> apply(String value) {
+                return Collections.singletonList(new Integer(value));
+            }
+        });
+
+        KStream<String, Integer>[] streams2 = stream2.branch(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            },
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return true;
+                }
+            }
+        );
+
+        KStream<String, Integer>[] streams3 = stream3.branch(
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return (value % 2) == 0;
+                }
+            },
+            new Predicate<String, Integer>() {
+                @Override
+                public boolean apply(String key, Integer value) {
+                    return true;
+                }
+            }
+        );
+
+        KStream<String, Integer> stream4 = streams2[0].with(new UnlimitedWindowDef<String, Integer>("window"))
+            .join(streams3[0].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
+                @Override
+                public Integer apply(Integer value1, Integer value2) {
+                    return value1 + value2;
+                }
+            });
+
+        KStream<String, Integer> stream5 = streams2[1].with(new UnlimitedWindowDef<String, Integer>("window"))
+            .join(streams3[1].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
+                @Override
+                public Integer apply(Integer value1, Integer value2) {
+                    return value1 + value2;
+                }
+            });
+
+        stream4.to("topic-5");
+
+        stream5.through("topic-6").process(new MockProcessorDef<>()).to("topic-7");
+
+        assertEquals(2 + // sources
+            2 + // stream1
+            1 + // stream2
+            1 + // stream3
+            1 + 2 + // streams2
+            1 + 2 + // streams3
+            2 + 3 + // stream4
+            2 + 3 + // stream5
+            1 + // to
+            2 + // through
+            1 + // process
+            1, // to
+            builder.build().processors().size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
new file mode 100644
index 0000000..7dea8e0
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStreamWindowed;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.UnlimitedWindowDef;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    private ValueMapper<String, String> valueMapper = new ValueMapper<String, String>() {
+        @Override
+        public String apply(String value) {
+            return "#" + value;
+        }
+    };
+
+    private ValueMapper<String, Iterable<String>> valueMapper2 = new ValueMapper<String, Iterable<String>>() {
+        @Override
+        public Iterable<String> apply(String value) {
+            return (Iterable<String>) Utils.mkSet(value);
+        }
+    };
+
+    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
+        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+            @Override
+            public KeyValue<Integer, String> apply(Integer key, String value) {
+                return KeyValue.pair(key, value);
+            }
+        };
+
+    KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>> keyValueMapper2 =
+        new KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>>() {
+            @Override
+            public KeyValue<Integer, Iterable<String>> apply(Integer key, String value) {
+                return KeyValue.pair(key, (Iterable<String>) Utils.mkSet(value));
+            }
+        };
+
+
+    @Test
+    public void testJoin() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStreamWindowed<Integer, String> windowed1;
+        KStreamWindowed<Integer, String> windowed2;
+        MockProcessorDef<Integer, String> processor;
+        String[] expected;
+
+        processor = new MockProcessorDef<>();
+        stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+        stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+        windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
+        windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
+
+        windowed1.join(windowed2, joiner).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        // push two items to the main stream. the other stream's window is empty
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+
+        assertEquals(0, processor.processed.size());
+
+        // push two items to the other stream. the main stream's window has two items
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
+
+        assertEquals(2, processor.processed.size());
+
+        expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+
+        processor.processed.clear();
+
+        // push all items to the main stream. this should produce two items.
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+
+        assertEquals(2, processor.processed.size());
+
+        expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+
+        processor.processed.clear();
+
+        // there will be previous two items + all items in the main stream's window, thus two are duplicates.
+
+        // push all items to the other stream. this should produce 6 items
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
+
+        assertEquals(6, processor.processed.size());
+
+        expected = new String[]{"0:X0+Y0", "0:X0+Y0", "1:X1+Y1", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+    // TODO: test for joinability
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
new file mode 100644
index 0000000..bec524f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamMapTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    @Test
+    public void testMap() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper =
+            new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() {
+                @Override
+                public KeyValue<String, Integer> apply(Integer key, String value) {
+                    return KeyValue.pair(value, key);
+                }
+            };
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        KStream<Integer, String> stream;
+        MockProcessorDef<String, Integer> processor;
+
+        processor = new MockProcessorDef<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.map(mapper).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(4, processor.processed.size());
+
+        String[] expected = new String[]{"V0:0", "V1:1", "V2:2", "V3:3"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
new file mode 100644
index 0000000..b6507fe
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorDef;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamMapValuesTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    @Test
+    public void testFlatMapValues() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        ValueMapper<String, Integer> mapper =
+            new ValueMapper<String, Integer>() {
+                @Override
+                public Integer apply(String value) {
+                    return value.length();
+                }
+            };
+
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        KStream<Integer, String> stream;
+        MockProcessorDef<Integer, Integer> processor = new MockProcessorDef<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.mapValues(mapper).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i]));
+        }
+
+        assertEquals(4, processor.processed.size());
+
+        String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
new file mode 100644
index 0000000..48a9fc3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.UnlimitedWindowDef;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamWindowedTest {
+
+    private String topicName = "topic";
+    private String windowName = "MyWindow";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    @Test
+    public void testWindowedStream() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        KStream<Integer, String> stream;
+        WindowDef<Integer, String> windowDef;
+
+        windowDef = new UnlimitedWindowDef<>(windowName);
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.with(windowDef);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        Window<Integer, String> window = (Window<Integer, String>) driver.getStateStore(windowName);
+        driver.setTime(0L);
+
+        // two items in the window
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
+        }
+
+        assertEquals(1, countItem(window.find(0, 0L)));
+        assertEquals(1, countItem(window.find(1, 0L)));
+        assertEquals(0, countItem(window.find(2, 0L)));
+        assertEquals(0, countItem(window.find(3, 0L)));
+
+        // previous two items + all items, thus two are duplicates, in the window
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
+
+        assertEquals(2, countItem(window.find(0, 0L)));
+        assertEquals(2, countItem(window.find(1, 0L)));
+        assertEquals(1, countItem(window.find(2, 0L)));
+        assertEquals(1, countItem(window.find(3, 0L)));
+    }
+
+
+    private <T> int countItem(Iterator<T> iter) {
+        int i = 0;
+        while (iter.hasNext()) {
+            i++;
+            iter.next();
+        }
+        return i;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
new file mode 100644
index 0000000..57a78ff
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.test.MockProcessorDef;
+import org.junit.Test;
+
+public class TopologyBuilderTest {
+
+    @Test(expected = TopologyException.class)
+    public void testAddSourceWithSameName() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source", "topic-1");
+        builder.addSource("source", "topic-2");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddSourceWithSameTopic() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source", "topic-1");
+        builder.addSource("source-2", "topic-1");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddProcessorWithSameName() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source", "topic-1");
+        builder.addProcessor("processor", new MockProcessorDef(), "source");
+        builder.addProcessor("processor", new MockProcessorDef(), "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddProcessorWithWrongParent() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addProcessor("processor", new MockProcessorDef(), "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddProcessorWithSelfParent() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addProcessor("processor", new MockProcessorDef(), "processor");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddSinkWithSameName() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source", "topic-1");
+        builder.addSink("sink", "topic-2", "source");
+        builder.addSink("sink", "topic-3", "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddSinkWithWrongParent() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSink("sink", "topic-2", "source");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddSinkWithSelfParent() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSink("sink", "topic-2", "sink");
+    }
+
+    @Test
+    public void testSourceTopics() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1");
+        builder.addSource("source-2", "topic-2");
+        builder.addSource("source-3", "topic-3");
+
+        assertEquals(builder.sourceTopics().size(), 3);
+    }
+}


Mime
View raw message