kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs along with standby task support
Date Fri, 04 Dec 2015 22:59:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cd54fc881 -> 39c3512ec


http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index b68f763..8aed6b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -21,16 +21,11 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
@@ -39,8 +34,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     protected final String metricGrp;
     protected final Time time;
 
-    private final String topic;
-
     private Sensor putTime;
     private Sensor getTime;
     private Sensor deleteTime;
@@ -51,26 +44,20 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private Sensor restoreTime;
     private StreamingMetrics metrics;
 
-    private final Set<K> dirty;
-    private final Set<K> removed;
-    private final int maxDirty;
-    private final int maxRemoved;
-
-    private int partition;
-    private ProcessorContext context;
+    private boolean loggingEnabled = true;
+    private KeyValueStoreChangeLogger<K, V> changeLogger = null;
 
-    // always wrap the logged store with the metered store
+    // always wrap the store with the metered store
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
         this.inner = inner;
         this.serialization = serialization;
         this.metricGrp = metricGrp;
         this.time = time != null ? time : new SystemTime();
-        this.topic = inner.name();
+    }
 
-        this.dirty = new HashSet<K>();
-        this.removed = new HashSet<K>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
+    public MeteredKeyValueStore<K, V> disableLogging() {
+        loggingEnabled = false;
+        return this;
     }
 
     @Override
@@ -80,7 +67,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public void init(ProcessorContext context) {
-        String name = name();
+        final String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
         this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -92,8 +79,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
 
         serialization.init(context);
-        this.context = context;
-        this.partition = context.id().partition;
+        this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;
 
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
@@ -105,8 +91,8 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             context.register(this, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
-                    inner.put(keyDeserializer.deserialize(topic, key),
-                            valDeserializer.deserialize(topic, value));
+                    inner.put(keyDeserializer.deserialize(name, key),
+                            valDeserializer.deserialize(name, value));
                 }
             });
         } finally {
@@ -135,9 +121,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             this.inner.put(key, value);
 
-            this.dirty.add(key);
-            this.removed.remove(key);
-            maybeLogChange();
+            if (loggingEnabled) {
+                changeLogger.add(key);
+                changeLogger.maybeLogChange(this.inner);
+            }
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
@@ -149,13 +136,13 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             this.inner.putAll(entries);
 
-            for (Entry<K, V> entry : entries) {
-                K key = entry.key();
-                this.dirty.add(key);
-                this.removed.remove(key);
+            if (loggingEnabled) {
+                for (Entry<K, V> entry : entries) {
+                    K key = entry.key();
+                    changeLogger.add(key);
+                }
+                changeLogger.maybeLogChange(this.inner);
             }
-
-            maybeLogChange();
         } finally {
             this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
         }
@@ -167,9 +154,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             V value = this.inner.delete(key);
 
-            this.dirty.remove(key);
-            this.removed.add(key);
-            maybeLogChange();
+            removed(key);
 
             return value;
         } finally {
@@ -179,14 +164,15 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     /**
      * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
-     * store other than {@link #delete(Object)}.
+     * store.
      *
      * @param key the key for the entry that the inner store removed
      */
     protected void removed(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-        maybeLogChange();
+        if (loggingEnabled) {
+            changeLogger.delete(key);
+            changeLogger.maybeLogChange(this.inner);
+        }
     }
 
     @Override
@@ -209,35 +195,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         long startNs = time.nanoseconds();
         try {
             this.inner.flush();
-            logChange();
+
+            if (loggingEnabled)
+                changeLogger.logChange(this.inner);
         } finally {
             this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
         }
     }
 
-    private void maybeLogChange() {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange();
-    }
-
-    private void logChange() {
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = this.inner.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
-        }
-    }
-
     private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
 
         private final KeyValueIterator<K1, V1> iter;

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
index f1fbd9f..41314b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -17,25 +17,9 @@
 
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
 
 /**
  * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
@@ -43,7 +27,7 @@ import java.util.NoSuchElementException;
  * @param <K> the type of keys
  * @param <V> the type of values
  *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ * @see Stores#create(String)
  */
 public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
@@ -62,239 +46,7 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+        return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
     }
 
-    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
-        private static final int TTL_NOT_USED = -1;
-
-        // TODO: these values should be configurable
-        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
-        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
-        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
-        private static final long BLOCK_SIZE = 4096L;
-        private static final int TTL_SECONDS = TTL_NOT_USED;
-        private static final int MAX_WRITE_BUFFERS = 3;
-        private static final String DB_FILE_DIR = "rocksdb";
-
-        private final Serdes<K, V> serdes;
-        private final String topic;
-
-        private final Options options;
-        private final WriteOptions wOptions;
-        private final FlushOptions fOptions;
-
-        private ProcessorContext context;
-        private int partition;
-        private String dbName;
-        private String dirName;
-        private RocksDB db;
-
-        public RocksDBStore(String name, Serdes<K, V> serdes) {
-            this.topic = name;
-            this.serdes = serdes;
-
-            // initialize the rocksdb options
-            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
-            tableConfig.setBlockSize(BLOCK_SIZE);
-
-            options = new Options();
-            options.setTableFormatConfig(tableConfig);
-            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
-            options.setCompressionType(COMPRESSION_TYPE);
-            options.setCompactionStyle(COMPACTION_STYLE);
-            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
-            options.setCreateIfMissing(true);
-            options.setErrorIfExists(false);
-
-            wOptions = new WriteOptions();
-            wOptions.setDisableWAL(true);
-
-            fOptions = new FlushOptions();
-            fOptions.setWaitForFlush(true);
-        }
-
-        public void init(ProcessorContext context) {
-            serdes.init(context);
-
-            this.context = context;
-            this.partition = context.id().partition;
-            this.dbName = this.topic + "." + this.partition;
-            this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-            this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
-        }
-
-        private RocksDB openDB(File dir, Options options, int ttl) {
-            try {
-                if (ttl == TTL_NOT_USED) {
-                    dir.getParentFile().mkdirs();
-                    return RocksDB.open(options, dir.toString());
-                } else {
-                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
-                    // TODO: support TTL with change log?
-                    // return TtlDB.open(options, dir.toString(), ttl, false);
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
-            }
-        }
-
-        @Override
-        public String name() {
-            return this.topic;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            try {
-                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void put(K key, V value) {
-            try {
-                if (value == null) {
-                    db.remove(wOptions, serdes.rawKey(key));
-                } else {
-                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = get(key);
-            put(key, null);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            RocksIterator innerIter = db.newIterator();
-            innerIter.seekToFirst();
-            return new RocksDbIterator<K, V>(innerIter, serdes);
-        }
-
-        @Override
-        public void flush() {
-            try {
-                db.flush(fOptions);
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing flush from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void close() {
-            flush();
-            db.close();
-        }
-
-        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
-            private final RocksIterator iter;
-            private final Serdes<K, V> serdes;
-
-            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
-                this.iter = iter;
-                this.serdes = serdes;
-            }
-
-            protected byte[] peekRawKey() {
-                return iter.key();
-            }
-
-            protected Entry<K, V> getEntry() {
-                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.isValid();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                Entry<K, V> entry = this.getEntry();
-                iter.next();
-                return entry;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-
-        private static class LexicographicComparator implements Comparator<byte[]> {
-
-            @Override
-            public int compare(byte[] left, byte[] right) {
-                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                    int leftByte = left[i] & 0xff;
-                    int rightByte = right[j] & 0xff;
-                    if (leftByte != rightByte) {
-                        return leftByte - rightByte;
-                    }
-                }
-                return left.length - right.length;
-            }
-        }
-
-        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
-            // RocksDB's JNI interface does not expose getters/setters that allow the
-            // comparator to be pluggable, and the default is lexicographic, so it's
-            // safe to just force lexicographic comparator here for now.
-            private final Comparator<byte[]> comparator = new LexicographicComparator();
-            byte[] rawToKey;
-
-            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
-                    K from, K to) {
-                super(iter, serdes);
-                iter.seek(serdes.rawKey(from));
-                this.rawToKey = serdes.rawKey(to);
-            }
-
-            @Override
-            public boolean hasNext() {
-                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
-            }
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
new file mode 100644
index 0000000..40ca9f5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+    private static final int TTL_NOT_USED = -1;
+
+    // TODO: these values should be configurable
+    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+    private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+    private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+    private static final long BLOCK_SIZE = 4096L;
+    private static final int TTL_SECONDS = TTL_NOT_USED;
+    private static final int MAX_WRITE_BUFFERS = 3;
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private final String topic;
+
+    private final Options options;
+    private final WriteOptions wOptions;
+    private final FlushOptions fOptions;
+
+    private Serdes<K, V> serdes;
+    private ProcessorContext context;
+    private String dbName;
+    private String dirName;
+    private RocksDB db;
+
+    public RocksDBStore(String name, Serdes<K, V> serdes) {
+        this.topic = name;
+        this.serdes = serdes;
+
+        // initialize the rocksdb options
+        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+        tableConfig.setBlockSize(BLOCK_SIZE);
+
+        options = new Options();
+        options.setTableFormatConfig(tableConfig);
+        options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+        options.setCompressionType(COMPRESSION_TYPE);
+        options.setCompactionStyle(COMPACTION_STYLE);
+        options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+        options.setCreateIfMissing(true);
+        options.setErrorIfExists(false);
+
+        wOptions = new WriteOptions();
+        wOptions.setDisableWAL(true);
+
+        fOptions = new FlushOptions();
+        fOptions.setWaitForFlush(true);
+    }
+
+    public void init(ProcessorContext context) {
+        serdes.init(context);
+
+        this.context = context;
+        this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+        this.db = openDB(new File(this.dirName, this.topic), this.options, TTL_SECONDS);
+    }
+
+    private RocksDB openDB(File dir, Options options, int ttl) {
+        try {
+            if (ttl == TTL_NOT_USED) {
+                dir.getParentFile().mkdirs();
+                return RocksDB.open(options, dir.toString());
+            } else {
+                throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+                // TODO: support TTL with change log?
+                // return TtlDB.open(options, dir.toString(), ttl, false);
+            }
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return this.topic;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public V get(K key) {
+        try {
+            return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        try {
+            if (value == null) {
+                db.remove(wOptions, serdes.rawKey(key));
+            } else {
+                db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+            }
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+        }
+    }
+
+    @Override
+    public void putAll(List<Entry<K, V>> entries) {
+        for (Entry<K, V> entry : entries)
+            put(entry.key(), entry.value());
+    }
+
+    @Override
+    public V delete(K key) {
+        V value = get(key);
+        put(key, null);
+        return value;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        RocksIterator innerIter = db.newIterator();
+        innerIter.seekToFirst();
+        return new RocksDbIterator<K, V>(innerIter, serdes);
+    }
+
+    @Override
+    public void flush() {
+        try {
+            db.flush(fOptions);
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing flush from store " + this.topic, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        flush();
+        db.close();
+    }
+
+    private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+        private final RocksIterator iter;
+        private final Serdes<K, V> serdes;
+
+        public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+            this.iter = iter;
+            this.serdes = serdes;
+        }
+
+        protected byte[] peekRawKey() {
+            return iter.key();
+        }
+
+        protected Entry<K, V> getEntry() {
+            return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.isValid();
+        }
+
+        @Override
+        public Entry<K, V> next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            Entry<K, V> entry = this.getEntry();
+            iter.next();
+            return entry;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+        }
+
+        @Override
+        public void close() {
+        }
+
+    }
+
+    private static class LexicographicComparator implements Comparator<byte[]> {
+
+        @Override
+        public int compare(byte[] left, byte[] right) {
+            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                int leftByte = left[i] & 0xff;
+                int rightByte = right[j] & 0xff;
+                if (leftByte != rightByte) {
+                    return leftByte - rightByte;
+                }
+            }
+            return left.length - right.length;
+        }
+    }
+
+    private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+        // RocksDB's JNI interface does not expose getters/setters that allow the
+        // comparator to be pluggable, and the default is lexicographic, so it's
+        // safe to just force lexicographic comparator here for now.
+        private final Comparator<byte[]> comparator = new LexicographicComparator();
+        byte[] rawToKey;
+
+        public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+                                    K from, K to) {
+            super(iter, serdes);
+            iter.seek(serdes.rawKey(from));
+            this.rawToKey = serdes.rawKey(to);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index f41d928..4e1b05a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-final class Serdes<K, V> {
+public final class Serdes<K, V> {
 
     public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
         Serializer<K> keySerializer = serializer(keyClass);
@@ -73,6 +73,7 @@ final class Serdes<K, V> {
      * @param valueSerializer the serializer for values; may be null
      * @param valueDeserializer the deserializer for values; may be null
      */
+    @SuppressWarnings("unchecked")
     public Serdes(String topic,
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
             Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
new file mode 100644
index 0000000..590995b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableFilterTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+    private final Serializer<Integer> intSerializer = new IntegerSerializer();
+    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, Integer> table1 = builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+
+        KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 2);
+        driver.process(topic1, "C", 3);
+        driver.process(topic1, "D", 4);
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
+
+
+        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"), proc2.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, Integer, Integer> table1 =
+                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+
+            driver.process(topic1, "A", 1);
+            driver.process(topic1, "B", 1);
+            driver.process(topic1, "C", 1);
+
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            driver.process(topic1, "A", 2);
+            driver.process(topic1, "B", 2);
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            driver.process(topic1, "A", 3);
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
new file mode 100644
index 0000000..56c5703
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableImplTest {
+
+    @Test
+    public void testKTable() {
+        final Serializer<String> serializer = new StringSerializer();
+        final Deserializer<String> deserializer = new StringDeserializer();
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KTable<String, String> table1 = builder.table(serializer, serializer, deserializer, deserializer, topic1);
+
+        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+        table1.toStream().process(proc1);
+
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+            @Override
+            public Integer apply(String value) {
+                return new Integer(value);
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
+        table3.toStream().process(proc3);
+
+        KTable<String, String> table4 = table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+        MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
+        table4.toStream().process(proc4);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "02");
+        driver.process(topic1, "C", "03");
+        driver.process(topic1, "D", "04");
+
+        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed);
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed);
+        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            getter3.init(driver.context());
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+            getter4.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", null);
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
new file mode 100644
index 0000000..1ca6643
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableMapValuesImplTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+            @Override
+            public Integer apply(String value) {
+                return new Integer(value);
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "02");
+        driver.process(topic1, "C", "03");
+        driver.process(topic1, "D", "04");
+
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            getter3.init(driver.context());
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+            getter4.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", null);
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
new file mode 100644
index 0000000..97aca3d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableSourceTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+        table1.toStream().process(proc1);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 2);
+        driver.process(topic1, "C", 3);
+        driver.process(topic1, "D", 4);
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
+
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
+                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            assertNull(getter1.get("A"));
+            assertNull(getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
index aa484fc..43ffa7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
@@ -61,6 +61,8 @@ public class KafkaStreamingPartitionAssignorTest {
     private TopicPartition t2p2 = new TopicPartition("topic2", 2);
     private TopicPartition t2p3 = new TopicPartition("topic2", 3);
 
+    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
     private List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
@@ -203,47 +205,26 @@ public class KafkaStreamingPartitionAssignorTest {
         assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
 
         // check assignment info
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        AssignmentInfo info;
-
-        List<TaskId> activeTasks = new ArrayList<>();
-        for (TopicPartition partition : assignments.get("consumer10").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-        assertEquals(0, info.standbyTasks.size());
 
-        allActiveTasks.addAll(info.activeTasks);
+        Set<TaskId> allActiveTasks = new HashSet<>();
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer11").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-        assertEquals(0, info.standbyTasks.size());
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
 
-        allActiveTasks.addAll(info.activeTasks);
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
 
         // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer20").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-        assertEquals(0, info.standbyTasks.size());
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
 
-        allActiveTasks.addAll(info.activeTasks);
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -266,6 +247,7 @@ public class KafkaStreamingPartitionAssignorTest {
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
+
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
         final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
@@ -291,55 +273,29 @@ public class KafkaStreamingPartitionAssignorTest {
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
-        // check assigned partitions
-
-        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
-                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
-        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
-
-        // check assignment info
         Set<TaskId> allActiveTasks = new HashSet<>();
         Set<TaskId> allStandbyTasks = new HashSet<>();
-        AssignmentInfo info;
 
-        List<TaskId> activeTasks = new ArrayList<>();
-        for (TopicPartition partition : assignments.get("consumer10").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+        allStandbyTasks.addAll(info10.standbyTasks.keySet());
 
-        allActiveTasks.addAll(info.activeTasks);
-        allStandbyTasks.addAll(info.standbyTasks);
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
+        allStandbyTasks.addAll(info11.standbyTasks.keySet());
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer11").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-
-        allActiveTasks.addAll(info.activeTasks);
-        allStandbyTasks.addAll(info.standbyTasks);
-
-        // check tasks assigned to the first client
+        // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+        assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer20").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
+        allStandbyTasks.addAll(info20.standbyTasks.keySet());
 
-        allActiveTasks.addAll(info.activeTasks);
-        allStandbyTasks.addAll(info.standbyTasks);
+        // all task ids are in the active tasks and also in the standby tasks
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -348,6 +304,48 @@ public class KafkaStreamingPartitionAssignorTest {
         assertEquals(allTasks, new HashSet<>(allStandbyTasks));
     }
 
+    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+        // check if the number of assigned partitions == the size of active task id list
+        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+        // check if active tasks are consistent
+        List<TaskId> activeTasks = new ArrayList<>();
+        Set<String> activeTopics = new HashSet<>();
+        for (TopicPartition partition : assignment.partitions()) {
+            // since default grouper, taskid.partition == partition.partition()
+            activeTasks.add(new TaskId(0, partition.partition()));
+            activeTopics.add(partition.topic());
+        }
+        assertEquals(activeTasks, info.activeTasks);
+
+        // check if active partitions cover all topics
+        assertEquals(allTopics, activeTopics);
+
+        // check if standby tasks are consistent
+        Set<String> standbyTopics = new HashSet<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                // since default grouper, taskid.partition == partition.partition()
+                assertEquals(id.partition, partition.partition());
+
+                standbyTopics.add(partition.topic());
+            }
+        }
+
+        if (info.standbyTasks.size() > 0)
+            // check if standby partitions cover all topics
+            assertEquals(allTopics, standbyTopics);
+
+        return info;
+    }
+
     @Test
     public void testOnAssignment() throws Exception {
         StreamingConfig config = new StreamingConfig(configProps());
@@ -369,7 +367,10 @@ public class KafkaStreamingPartitionAssignorTest {
         partitionAssignor.configure(config.getConsumerConfigs(thread));
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
-        Set<TaskId> standbyTasks = Utils.mkSet(task1, task2);
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
+        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+
         AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b8a6990..9a43e46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -41,6 +43,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
@@ -53,6 +56,7 @@ public class StandbyTaskTest {
     private final TopicPartition partition1 = new TopicPartition("store1", 1);
     private final TopicPartition partition2 = new TopicPartition("store2", 1);
 
+    private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = new ProcessorTopology(
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
@@ -78,6 +82,7 @@ public class StandbyTaskTest {
         });
     }
 
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final ProcessorStateManagerTest.MockRestoreConsumer restoreStateConsumer = new ProcessorStateManagerTest.MockRestoreConsumer();
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
@@ -104,7 +109,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
 
@@ -119,7 +124,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -138,7 +143,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a95c2fa..aae5a7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -103,7 +103,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+            StreamTask task = new StreamTask(new TaskId(0, 0), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -154,7 +154,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+            StreamTask task = new StreamTask(new TaskId(1, 1), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 02d0ac7..9f31450 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -126,13 +126,13 @@ public class StreamThreadTest {
         public boolean committed = false;
 
         public TestStreamTask(TaskId id,
+                              Collection<TopicPartition> partitions,
+                              ProcessorTopology topology,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
-                              Collection<TopicPartition> partitions,
-                              ProcessorTopology topology,
                               StreamingConfig config) {
-            super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
+            super(id, partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
         @Override
@@ -163,7 +163,7 @@ public class StreamThreadTest {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+                return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
             }
         };
 
@@ -288,7 +288,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+                    return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -410,7 +410,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+                    return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
index 58e0af9..14a7f9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -33,8 +36,10 @@ public class AssginmentInfoTest {
     public void testEncodeDecode() {
         List<TaskId> activeTasks =
                 Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
-        Set<TaskId> standbyTasks =
-                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
 
         AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks);
         AssignmentInfo decoded = AssignmentInfo.decode(info.encode());

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index ca5f33d..119f08f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -17,28 +17,44 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
 
+import java.io.File;
 import java.util.List;
 
 public class KStreamTestDriver {
 
     private final ProcessorTopology topology;
     private final MockProcessorContext context;
+    public final File stateDir;
+
     private ProcessorNode currNode;
 
     public KStreamTestDriver(KStreamBuilder builder) {
-        this(builder, null, null);
+        this(builder, null, null, null, null, null);
     }
 
-    public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
+    public KStreamTestDriver(KStreamBuilder builder,
+                             File stateDir,
+                             Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
+                             Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
         this.topology = builder.build(null);
-        this.context = new MockProcessorContext(this, serializer, deserializer);
+        this.context = new MockProcessorContext(this, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
+        this.stateDir = stateDir;
+
+        for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
+            StateStore store = stateStoreSupplier.get();
+            store.init(context);
+        }
 
         for (ProcessorNode node : topology.processors()) {
             currNode = node;
@@ -50,6 +66,10 @@ public class KStreamTestDriver {
         }
     }
 
+    public ProcessorContext context() {
+        return context;
+    }
+
     public void process(String topicName, Object key, Object value) {
         currNode = topology.source(topicName);
         try {
@@ -92,4 +112,21 @@ public class KStreamTestDriver {
         }
     }
 
+    private class MockRecordCollector extends RecordCollector {
+        public MockRecordCollector() {
+            super(null);
+        }
+
+        public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+            // The serialization is skipped.
+            process(record.topic(), record.key(), record.value());
+        }
+
+        public void flush() {
+        }
+
+        public void close() {
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 40f11a0..81a9add 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -109,17 +110,24 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public File stateDir() {
-        throw new UnsupportedOperationException("stateDir() not supported.");
+        return driver.stateDir;
     }
 
     @Override
     public StreamingMetrics metrics() {
-        throw new UnsupportedOperationException("metrics() not supported.");
+        return new StreamingMetrics() {
+            @Override
+            public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+                return null;
+            }
+            @Override
+            public void recordLatency(Sensor sensor, long startNs, long endNs) {
+            }
+        };
     }
 
     @Override
     public void register(StateStore store, StateRestoreCallback func) {
-        if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
         storeMap.put(store.name(), store);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 5f796c6..fdb4d57 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -158,11 +158,11 @@ public class ProcessorTopologyTestDriver {
         }
 
         task = new StreamTask(id,
+            partitionsByTopic.values(),
+            topology,
             consumer,
             producer,
             restoreStateConsumer,
-            partitionsByTopic.values(),
-            topology,
             config,
             new StreamingMetrics() {
                 @Override


Mime
View raw message