kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
Date Tue, 10 Nov 2015 00:27:30 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
deleted file mode 100644
index fe8f00a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
- */
-public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final Serdes serdes;
-    private final Time time;
-
-    protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
-    }
-
-    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
-        private static final int TTL_NOT_USED = -1;
-
-        // TODO: these values should be configurable
-        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
-        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
-        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
-        private static final long BLOCK_SIZE = 4096L;
-        private static final int TTL_SECONDS = TTL_NOT_USED;
-        private static final int MAX_WRITE_BUFFERS = 3;
-        private static final String DB_FILE_DIR = "rocksdb";
-
-        private final Serdes<K, V> serdes;
-        private final String topic;
-
-        private final Options options;
-        private final WriteOptions wOptions;
-        private final FlushOptions fOptions;
-
-        private ProcessorContext context;
-        private int partition;
-        private String dbName;
-        private String dirName;
-        private RocksDB db;
-
-        public RocksDBStore(String name, Serdes<K, V> serdes) {
-            this.topic = name;
-            this.serdes = serdes;
-
-            // initialize the rocksdb options
-            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
-            tableConfig.setBlockSize(BLOCK_SIZE);
-
-            options = new Options();
-            options.setTableFormatConfig(tableConfig);
-            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
-            options.setCompressionType(COMPRESSION_TYPE);
-            options.setCompactionStyle(COMPACTION_STYLE);
-            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
-            options.setCreateIfMissing(true);
-            options.setErrorIfExists(false);
-
-            wOptions = new WriteOptions();
-            wOptions.setDisableWAL(true);
-
-            fOptions = new FlushOptions();
-            fOptions.setWaitForFlush(true);
-        }
-
-        public void init(ProcessorContext context) {
-            this.context = context;
-            this.partition = context.id().partition;
-            this.dbName = this.topic + "." + this.partition;
-            this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-            this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
-        }
-
-        private RocksDB openDB(File dir, Options options, int ttl) {
-            try {
-                if (ttl == TTL_NOT_USED) {
-                    dir.getParentFile().mkdirs();
-                    return RocksDB.open(options, dir.toString());
-                } else {
-                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
-                    // TODO: support TTL with change log?
-                    // return TtlDB.open(options, dir.toString(), ttl, false);
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
-            }
-        }
-
-        @Override
-        public String name() {
-            return this.topic;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            try {
-                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void put(K key, V value) {
-            try {
-                if (value == null) {
-                    db.remove(wOptions, serdes.rawKey(key));
-                } else {
-                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = get(key);
-            put(key, null);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            RocksIterator innerIter = db.newIterator();
-            innerIter.seekToFirst();
-            return new RocksDbIterator<K, V>(innerIter, serdes);
-        }
-
-        @Override
-        public void flush() {
-            try {
-                db.flush(fOptions);
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing flush from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void close() {
-            flush();
-            db.close();
-        }
-
-        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
-            private final RocksIterator iter;
-            private final Serdes<K, V> serdes;
-
-            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
-                this.iter = iter;
-                this.serdes = serdes;
-            }
-
-            protected byte[] peekRawKey() {
-                return iter.key();
-            }
-
-            protected Entry<K, V> getEntry() {
-                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.isValid();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                Entry<K, V> entry = this.getEntry();
-                iter.next();
-                return entry;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-
-        private static class LexicographicComparator implements Comparator<byte[]> {
-
-            @Override
-            public int compare(byte[] left, byte[] right) {
-                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                    int leftByte = left[i] & 0xff;
-                    int rightByte = right[j] & 0xff;
-                    if (leftByte != rightByte) {
-                        return leftByte - rightByte;
-                    }
-                }
-                return left.length - right.length;
-            }
-        }
-
-        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
-            // RocksDB's JNI interface does not expose getters/setters that allow the
-            // comparator to be pluggable, and the default is lexicographic, so it's
-            // safe to just force lexicographic comparator here for now.
-            private final Comparator<byte[]> comparator = new LexicographicComparator();
-            byte[] rawToKey;
-
-            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
-                    K from, K to) {
-                super(iter, serdes);
-                iter.seek(serdes.rawKey(from));
-                this.rawToKey = serdes.rawKey(to);
-            }
-
-            @Override
-            public boolean hasNext() {
-                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
deleted file mode 100644
index 31bd439..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
-
-final class Serdes<K, V> {
-
-    public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
-        Serializer<K> keySerializer = serializer(keyClass);
-        Deserializer<K> keyDeserializer = deserializer(keyClass);
-        Serializer<V> valueSerializer = serializer(valueClass);
-        Deserializer<V> valueDeserializer = deserializer(valueClass);
-        return new Serdes<>(topic, keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
-    }
-
-    @SuppressWarnings("unchecked")
-    static <T> Serializer<T> serializer(Class<T> type) {
-        if (String.class.isAssignableFrom(type)) return (Serializer<T>) new StringSerializer();
-        if (Integer.class.isAssignableFrom(type)) return (Serializer<T>) new IntegerSerializer();
-        if (Long.class.isAssignableFrom(type)) return (Serializer<T>) new LongSerializer();
-        if (byte[].class.isAssignableFrom(type)) return (Serializer<T>) new ByteArraySerializer();
-        throw new IllegalArgumentException("Unknown class for built-in serializer");
-    }
-
-    @SuppressWarnings("unchecked")
-    static <T> Deserializer<T> deserializer(Class<T> type) {
-        if (String.class.isAssignableFrom(type)) return (Deserializer<T>) new StringDeserializer();
-        if (Integer.class.isAssignableFrom(type)) return (Deserializer<T>) new IntegerDeserializer();
-        if (Long.class.isAssignableFrom(type)) return (Deserializer<T>) new LongDeserializer();
-        if (byte[].class.isAssignableFrom(type)) return (Deserializer<T>) new ByteArrayDeserializer();
-        throw new IllegalArgumentException("Unknown class for built-in serializer");
-    }
-
-    private final String topic;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-
-    /**
-     * Create a context for serialization using the specified serializers and deserializers.
-     *
-     * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may not be null
-     * @param keyDeserializer the deserializer for keys; may not be null
-     * @param valueSerializer the serializer for values; may not be null
-     * @param valueDeserializer the deserializer for values; may not be null
-     */
-    public Serdes(String topic,
-            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
-            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
-        this.topic = topic;
-        this.keySerializer = keySerializer;
-        this.keyDeserializer = keyDeserializer;
-        this.valueSerializer = valueSerializer;
-        this.valueDeserializer = valueDeserializer;
-    }
-
-    /**
-     * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
-     * corresponding {@link StreamingConfig}'s serializer or deserializer, which
-     * <em>must</em> match the key and value types used as parameters for this object.
-     *
-     * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer() default
-     *            key serializer} should be used
-     * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer() default
-     *            key deserializer} should be used
-     * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer() default
-     *            value serializer} should be used
-     * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
-     *            default value deserializer} should be used
-     * @param config the streaming config
-     */
-    @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
-            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
-            StreamingConfig config) {
-        this.topic = topic;
-
-        this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
-        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
-        this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
-        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
-    }
-
-    /**
-     * Create a context for serialization using the {@link StreamingConfig}'s serializers and deserializers, which
-     * <em>must</em> match the key and value types used as parameters for this object.
-     *
-     * @param topic the name of the topic
-     * @param config the streaming config
-     */
-    @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-                  StreamingConfig config) {
-        this(topic, null, null, null, null, config);
-    }
-
-    public Deserializer<K> keyDeserializer() {
-        return keyDeserializer;
-    }
-
-    public Serializer<K> keySerializer() {
-        return keySerializer;
-    }
-
-    public Deserializer<V> valueDeserializer() {
-        return valueDeserializer;
-    }
-
-    public Serializer<V> valueSerializer() {
-        return valueSerializer;
-    }
-
-    public String topic() {
-        return topic;
-    }
-
-    public K keyFrom(byte[] rawKey) {
-        return keyDeserializer.deserialize(topic, rawKey);
-    }
-
-    public V valueFrom(byte[] rawValue) {
-        return valueDeserializer.deserialize(topic, rawValue);
-    }
-
-    public byte[] rawKey(K key) {
-        return keySerializer.serialize(topic, key);
-    }
-
-    public byte[] rawValue(V value) {
-        return valueSerializer.serialize(topic, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
deleted file mode 100644
index c5f040f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * Factory for creating key-value stores.
- */
-public class Stores {
-
-    /**
-     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
-     *
-     * @param name the name of the store
-     * @return the factory that can be used to specify other options or configurations for the store; never null
-     */
-    public static StoreFactory create(final String name, final StreamingConfig config) {
-        return new StoreFactory() {
-            @Override
-            public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
-                return new ValueFactory<K>() {
-                    @Override
-                    public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
-                                                                final Deserializer<V> valueDeserializer) {
-                        final Serdes<K, V> serdes =
-                                new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, config);
-                        return new KeyValueFactory<K, V>() {
-                            @Override
-                            public InMemoryKeyValueFactory<K, V> inMemory() {
-                                return new InMemoryKeyValueFactory<K, V>() {
-                                    private int capacity = Integer.MAX_VALUE;
-
-                                    @Override
-                                    public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) {
-                                        if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive");
-                                        this.capacity = capacity;
-                                        return this;
-                                    }
-
-                                    @Override
-                                    public StateStoreSupplier build() {
-                                        if (capacity < Integer.MAX_VALUE) {
-                                            return new InMemoryLRUCacheStoreSupplier<>(name, capacity, serdes, null);
-                                        }
-                                        return new InMemoryKeyValueStoreSupplier<>(name, serdes, null);
-                                    }
-                                };
-                            }
-
-                            @Override
-                            public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
-                                return new LocalDatabaseKeyValueFactory<K, V>() {
-                                    @Override
-                                    public StateStoreSupplier build() {
-                                        return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
-                                    }
-                                };
-                            }
-                        };
-                    }
-                };
-            }
-        };
-    }
-
-    public static abstract class StoreFactory {
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<String> withStringKeys() {
-            return withKeys(new StringSerializer(), new StringDeserializer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Integer> withIntegerKeys() {
-            return withKeys(new IntegerSerializer(), new IntegerDeserializer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<Long> withLongKeys() {
-            return withKeys(new LongSerializer(), new LongDeserializer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
-         *
-         * @return the interface used to specify the type of values; never null
-         */
-        public ValueFactory<byte[]> withByteArrayKeys() {
-            return withKeys(new ByteArraySerializer(), new ByteArrayDeserializer());
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer},
-         * {@link Long}, or {@code byte[]}.
-         *
-         * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and
-         *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
-         *            {@code byte[].class})
-         * @return the interface used to specify the type of values; never null
-         */
-        public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
-            return withKeys(Serdes.serializer(keyClass), Serdes.deserializer(keyClass));
-        }
-
-        /**
-         * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
-         *
-         * @param keySerializer the serializer for keys; may not be null
-         * @param keyDeserializer the deserializer for keys; may not be null
-         * @return the interface used to specify the type of values; never null
-         */
-        public abstract <K> ValueFactory<K> withKeys(Serializer<K> keySerializer, Deserializer<K> keyDeserializer);
-    }
-
-    /**
-     * The factory for creating off-heap key-value stores.
-     *
-     * @param <K> the type of keys
-     */
-    public static abstract class ValueFactory<K> {
-        /**
-         * Use {@link String} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, String> withStringValues() {
-            return withValues(new StringSerializer(), new StringDeserializer());
-        }
-
-        /**
-         * Use {@link Integer} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, Integer> withIntegerValues() {
-            return withValues(new IntegerSerializer(), new IntegerDeserializer());
-        }
-
-        /**
-         * Use {@link Long} values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, Long> withLongValues() {
-            return withValues(new LongSerializer(), new LongDeserializer());
-        }
-
-        /**
-         * Use byte arrays for values.
-         *
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public KeyValueFactory<K, byte[]> withByteArrayValues() {
-            return withValues(new ByteArraySerializer(), new ByteArrayDeserializer());
-        }
-
-        /**
-         * Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]}
-         * .
-         *
-         * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and
-         *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
-         *            {@code byte[].class})
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
-            return withValues(Serdes.serializer(valueClass), Serdes.deserializer(valueClass));
-        }
-
-        /**
-         * Use the specified serializer and deserializer for the values.
-         *
-         * @param valueSerializer the serializer for value; may not be null
-         * @param valueDeserializer the deserializer for values; may not be null
-         * @return the interface used to specify the remaining key-value store options; never null
-         */
-        public abstract <V> KeyValueFactory<K, V> withValues(Serializer<V> valueSerializer, Deserializer<V> valueDeserializer);
-    }
-
-    /**
-     * The interface used to specify the different kinds of key-value stores.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    public static interface KeyValueFactory<K, V> {
-        /**
-         * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
-         * read to restore the entries if they are lost.
-         *
-         * @return the factory to create in-memory key-value stores; never null
-         */
-        InMemoryKeyValueFactory<K, V> inMemory();
-
-        /**
-         * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
-         * topic that can be read to restore the entries if they are lost.
-         *
-         * @return the factory to create in-memory key-value stores; never null
-         */
-        LocalDatabaseKeyValueFactory<K, V> localDatabase();
-    }
-
-    /**
-     * The interface used to create in-memory key-value stores.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    public static interface InMemoryKeyValueFactory<K, V> {
-        /**
-         * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
-         * equivalent to not placing a limit on the number of entries.
-         *
-         * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
-         * @return this factory
-         * @throws IllegalArgumentException if the capacity is not positive
-         */
-        InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
-
-        /**
-         * Return the instance of StateStoreSupplier of new key-value store.
-         * @return the state store supplier; never null
-         */
-        StateStoreSupplier build();
-    }
-
-    /**
-     * The interface used to create off-heap key-value stores that use a local database.
-     *
-     * @param <K> the type of keys
-     * @param <V> the type of values
-     */
-    public static interface LocalDatabaseKeyValueFactory<K, V> {
-        /**
-         * Return the instance of StateStoreSupplier of new key-value store.
-         * @return the key-value store; never null
-         */
-        StateStoreSupplier build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
deleted file mode 100644
index cf1cfaa..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.processor.TopologyException;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamBuilderTest {
-
-    @Test(expected = TopologyException.class)
-    public void testFrom() {
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        builder.from("topic-1", "topic-2");
-
-        builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
-    }
-
-    @Test
-    public void testNewName() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        assertEquals("X-0000000000", builder.newName("X-"));
-        assertEquals("Y-0000000001", builder.newName("Y-"));
-        assertEquals("Z-0000000002", builder.newName("Z-"));
-
-        builder = new KStreamBuilder();
-
-        assertEquals("X-0000000000", builder.newName("X-"));
-        assertEquals("Y-0000000001", builder.newName("Y-"));
-        assertEquals("Z-0000000002", builder.newName("Z-"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java
deleted file mode 100644
index 405c7c9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-public class FilteredIteratorTest {
-
-    @Test
-    public void testFiltering() {
-        List<Integer> list = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5);
-
-        Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) {
-            protected String filter(Integer i) {
-                if (i % 3 == 0) return i.toString();
-                return null;
-            }
-        };
-
-        List<String> expected = Arrays.asList("3", "9", "6", "3");
-        List<String> result = new ArrayList<String>();
-
-        while (filtered.hasNext()) {
-            result.add(filtered.next());
-        }
-
-        assertEquals(expected, result);
-    }
-
-    @Test
-    public void testEmptySource() {
-        List<Integer> list = new ArrayList<Integer>();
-
-        Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) {
-            protected String filter(Integer i) {
-                if (i % 3 == 0) return i.toString();
-                return null;
-            }
-        };
-
-        List<String> expected = new ArrayList<String>();
-        List<String> result = new ArrayList<String>();
-
-        while (filtered.hasNext()) {
-            result.add(filtered.next());
-        }
-
-        assertEquals(expected, result);
-    }
-
-    @Test
-    public void testNoMatch() {
-        List<Integer> list = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5);
-
-        Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) {
-            protected String filter(Integer i) {
-                if (i % 7 == 0) return i.toString();
-                return null;
-            }
-        };
-
-        List<String> expected = new ArrayList<String>();
-        List<String> result = new ArrayList<String>();
-
-        while (filtered.hasNext()) {
-            result.add(filtered.next());
-        }
-
-        assertEquals(expected, result);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
deleted file mode 100644
index 40eba2f..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.lang.reflect.Array;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamBranchTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testKStreamBranch() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        Predicate<Integer, String> isEven = new Predicate<Integer, String>() {
-            @Override
-            public boolean test(Integer key, String value) {
-                return (key % 2) == 0;
-            }
-        };
-        Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
-            @Override
-            public boolean test(Integer key, String value) {
-                return (key % 3) == 0;
-            }
-        };
-        Predicate<Integer, String> isOdd = new Predicate<Integer, String>() {
-            @Override
-            public boolean test(Integer key, String value) {
-                return (key % 2) != 0;
-            }
-        };
-
-        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6};
-
-        KStream<Integer, String> stream;
-        KStream<Integer, String>[] branches;
-        MockProcessorSupplier<Integer, String>[] processors;
-
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        branches = stream.branch(isEven, isMultipleOfThree, isOdd);
-
-        assertEquals(3, branches.length);
-
-        processors = (MockProcessorSupplier<Integer, String>[]) Array.newInstance(MockProcessorSupplier.class, branches.length);
-        for (int i = 0; i < branches.length; i++) {
-            processors[i] = new MockProcessorSupplier<>();
-            branches[i].process(processors[i]);
-        }
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(3, processors[0].processed.size());
-        assertEquals(1, processors[1].processed.size());
-        assertEquals(2, processors[2].processed.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
deleted file mode 100644
index d1e5d38..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamFilterTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
-        @Override
-        public boolean test(Integer key, String value) {
-            return (key % 3) == 0;
-        }
-    };
-
-    @Test
-    public void testFilter() {
-        KStreamBuilder builder = new KStreamBuilder();
-        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
-
-        KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.filter(isMultipleOfThree).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(2, processor.processed.size());
-    }
-
-    @Test
-    public void testFilterOut() {
-        KStreamBuilder builder = new KStreamBuilder();
-        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
-
-        KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.filterOut(isMultipleOfThree).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(5, processor.processed.size());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
deleted file mode 100644
index 61b5ccd..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-
-public class KStreamFlatMapTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @Test
-    public void testFlatMap() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>> mapper =
-            new KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>>() {
-                @Override
-                public Iterable<KeyValue<String, String>> apply(Integer key, String value) {
-                    ArrayList<KeyValue<String, String>> result = new ArrayList<>();
-                    for (int i = 0; i < key; i++) {
-                        result.add(KeyValue.pair(Integer.toString(key * 10 + i), value));
-                    }
-                    return result;
-                }
-            };
-
-        final int[] expectedKeys = {0, 1, 2, 3};
-
-        KStream<Integer, String> stream;
-        MockProcessorSupplier<String, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.flatMap(mapper).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(6, processor.processed.size());
-
-        String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
deleted file mode 100644
index 66faf07..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamFlatMapValuesTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @Test
-    public void testFlatMapValues() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        ValueMapper<String, Iterable<String>> mapper =
-            new ValueMapper<String, Iterable<String>>() {
-                @Override
-                public Iterable<String> apply(String value) {
-                    ArrayList<String> result = new ArrayList<String>();
-                    result.add(value.toLowerCase());
-                    result.add(value);
-                    return result;
-                }
-            };
-
-        final int[] expectedKeys = {0, 1, 2, 3};
-
-        KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.flatMapValues(mapper).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(8, processor.processed.size());
-
-        String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
deleted file mode 100644
index d924a34..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.UnlimitedWindowDef;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class KStreamImplTest {
-
-    @Test
-    public void testNumProcesses() {
-        final Deserializer<String> deserializer = new StringDeserializer();
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<String, String> source1 = builder.from(deserializer, deserializer, "topic-1", "topic-2");
-
-        KStream<String, String> source2 = builder.from(deserializer, deserializer, "topic-3", "topic-4");
-
-        KStream<String, String> stream1 =
-            source1.filter(new Predicate<String, String>() {
-                @Override
-                public boolean test(String key, String value) {
-                    return true;
-                }
-            }).filterOut(new Predicate<String, String>() {
-                @Override
-                public boolean test(String key, String value) {
-                    return false;
-                }
-            });
-
-        KStream<String, Integer> stream2 = stream1.mapValues(new ValueMapper<String, Integer>() {
-            @Override
-            public Integer apply(String value) {
-                return new Integer(value);
-            }
-        });
-
-        KStream<String, Integer> stream3 = source2.flatMapValues(new ValueMapper<String, Iterable<Integer>>() {
-            @Override
-            public Iterable<Integer> apply(String value) {
-                return Collections.singletonList(new Integer(value));
-            }
-        });
-
-        KStream<String, Integer>[] streams2 = stream2.branch(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return (value % 2) == 0;
-                }
-            },
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            }
-        );
-
-        KStream<String, Integer>[] streams3 = stream3.branch(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return (value % 2) == 0;
-                }
-            },
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            }
-        );
-
-        KStream<String, Integer> stream4 = streams2[0].with(new UnlimitedWindowDef<String, Integer>("window"))
-            .join(streams3[0].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
-                @Override
-                public Integer apply(Integer value1, Integer value2) {
-                    return value1 + value2;
-                }
-            });
-
-        KStream<String, Integer> stream5 = streams2[1].with(new UnlimitedWindowDef<String, Integer>("window"))
-            .join(streams3[1].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() {
-                @Override
-                public Integer apply(Integer value1, Integer value2) {
-                    return value1 + value2;
-                }
-            });
-
-        stream4.to("topic-5");
-
-        stream5.through("topic-6").process(new MockProcessorSupplier<>());
-
-        assertEquals(2 + // sources
-            2 + // stream1
-            1 + // stream2
-            1 + // stream3
-            1 + 2 + // streams2
-            1 + 2 + // streams3
-            2 + 3 + // stream4
-            2 + 3 + // stream5
-            1 + // to
-            2 + // through
-            1, // process
-            builder.build(null).processors().size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
deleted file mode 100644
index 12bed17..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.UnlimitedWindowDef;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamJoinTest {
-
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
-    private String dummyTopic = "dummyTopic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
-
-    private ValueMapper<String, String> valueMapper = new ValueMapper<String, String>() {
-        @Override
-        public String apply(String value) {
-            return "#" + value;
-        }
-    };
-
-    private ValueMapper<String, Iterable<String>> valueMapper2 = new ValueMapper<String, Iterable<String>>() {
-        @Override
-        public Iterable<String> apply(String value) {
-            return (Iterable<String>) Utils.mkSet(value);
-        }
-    };
-
-    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-            @Override
-            public KeyValue<Integer, String> apply(Integer key, String value) {
-                return KeyValue.pair(key, value);
-            }
-        };
-
-    KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>> keyValueMapper2 =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>>() {
-            @Override
-            public KeyValue<Integer, Iterable<String>> apply(Integer key, String value) {
-                return KeyValue.pair(key, (Iterable<String>) Utils.mkSet(value));
-            }
-        };
-
-
-    @Test
-    public void testJoin() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-        KStream<Integer, String> stream1;
-        KStream<Integer, String> stream2;
-        KStream<Integer, String> dummyStream;
-        KStreamWindowed<Integer, String> windowed1;
-        KStreamWindowed<Integer, String> windowed2;
-        MockProcessorSupplier<Integer, String> processor;
-        String[] expected;
-
-        processor = new MockProcessorSupplier<>();
-        stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
-        stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-        dummyStream = builder.from(keyDeserializer, valDeserializer, dummyTopic);
-        windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
-        windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
-
-        windowed1.join(windowed2, joiner).process(processor);
-
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
-        assertEquals(1, copartitionGroups.size());
-        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        driver.setTime(0L);
-
-        // push two items to the main stream. the other stream's window is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-
-        assertEquals(0, processor.processed.size());
-
-        // push two items to the other stream. the main stream's window has two items
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-
-        assertEquals(2, processor.processed.size());
-
-        expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-
-        processor.processed.clear();
-
-        // push all items to the main stream. this should produce two items.
-
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-
-        assertEquals(2, processor.processed.size());
-
-        expected = new String[]{"0:X0+Y0", "1:X1+Y1"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-
-        processor.processed.clear();
-
-        // there will be previous two items + all items in the main stream's window, thus two are duplicates.
-
-        // push all items to the other stream. this should produce 6 items
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-
-        assertEquals(6, processor.processed.size());
-
-        expected = new String[]{"0:X0+Y0", "0:X0+Y0", "1:X1+Y1", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-
-    @Test(expected = KafkaException.class)
-    public void testNotJoinable() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<Integer, String> stream1;
-        KStream<Integer, String> stream2;
-        KStreamWindowed<Integer, String> windowed1;
-        KStreamWindowed<Integer, String> windowed2;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream1 = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
-        stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
-        windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1"));
-        windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2"));
-
-        windowed1.join(windowed2, joiner).process(processor);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
deleted file mode 100644
index 2ae8a97..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamMapTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @Test
-    public void testMap() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper =
-            new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(Integer key, String value) {
-                    return KeyValue.pair(value, key);
-                }
-            };
-
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-        KStream<Integer, String> stream;
-        MockProcessorSupplier<String, Integer> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.map(mapper).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(4, processor.processed.size());
-
-        String[] expected = new String[]{"V0:0", "V1:1", "V2:2", "V3:3"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
deleted file mode 100644
index f830c00..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamMapValuesTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @Test
-    public void testFlatMapValues() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        ValueMapper<String, Integer> mapper =
-            new ValueMapper<String, Integer>() {
-                @Override
-                public Integer apply(String value) {
-                    return value.length();
-                }
-            };
-
-        final int[] expectedKeys = {1, 10, 100, 1000};
-
-        KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.mapValues(mapper).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i]));
-        }
-
-        assertEquals(4, processor.processed.size());
-
-        String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
deleted file mode 100644
index e397dd1..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamTransformTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private IntegerDeserializer valDeserializer = new IntegerDeserializer();
-
-    @Test
-    public void testTransform() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>> transformerSupplier =
-            new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
-                public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
-                    return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
-
-                        private int total = 0;
-
-                        @Override
-                        public void init(ProcessorContext context) {
-                        }
-
-                        @Override
-                        public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
-                            total += value;
-                            return KeyValue.pair(key * 2, total);
-                        }
-
-                        @Override
-                        public void punctuate(long timestamp) {
-                        }
-
-                        @Override
-                        public void close() {
-                        }
-                    };
-                }
-            };
-
-        final int[] expectedKeys = {1, 10, 100, 1000};
-
-        KStream<Integer, Integer> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.transform(transformerSupplier).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
-        }
-
-        assertEquals(4, processor.processed.size());
-
-        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
deleted file mode 100644
index c5c9b39..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamTransformValuesTest {
-
-    private String topicName = "topic";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private IntegerDeserializer valDeserializer = new IntegerDeserializer();
-
-    @Test
-    public void testTransform() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        ValueTransformerSupplier<Integer, Integer> valueTransformerSupplier =
-            new ValueTransformerSupplier<Integer, Integer>() {
-                public ValueTransformer<Integer, Integer> get() {
-                    return new ValueTransformer<Integer, Integer>() {
-
-                        private int total = 0;
-
-                        @Override
-                        public void init(ProcessorContext context) {
-                        }
-
-                        @Override
-                        public Integer transform(Integer value) {
-                            total += value;
-                            return total;
-                        }
-
-                        @Override
-                        public void punctuate(long timestamp) {
-                        }
-
-                        @Override
-                        public void close() {
-                        }
-                    };
-                }
-            };
-
-        final int[] expectedKeys = {1, 10, 100, 1000};
-
-        KStream<Integer, Integer> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.transformValues(valueTransformerSupplier).process(processor);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
-        }
-
-        assertEquals(4, processor.processed.size());
-
-        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
-
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
deleted file mode 100644
index c3dc7e0..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.UnlimitedWindowDef;
-import org.junit.Test;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamWindowedTest {
-
-    private String topicName = "topic";
-    private String windowName = "MyWindow";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @Test
-    public void testWindowedStream() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-        KStream<Integer, String> stream;
-        WindowSupplier<Integer, String> windowSupplier;
-
-        windowSupplier = new UnlimitedWindowDef<>(windowName);
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.with(windowSupplier);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        Window<Integer, String> window = (Window<Integer, String>) driver.getStateStore(windowName);
-        driver.setTime(0L);
-
-        // two items in the window
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(1, countItem(window.find(0, 0L)));
-        assertEquals(1, countItem(window.find(1, 0L)));
-        assertEquals(0, countItem(window.find(2, 0L)));
-        assertEquals(0, countItem(window.find(3, 0L)));
-
-        // previous two items + all items, thus two are duplicates, in the window
-
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-
-        assertEquals(2, countItem(window.find(0, 0L)));
-        assertEquals(2, countItem(window.find(1, 0L)));
-        assertEquals(1, countItem(window.find(2, 0L)));
-        assertEquals(1, countItem(window.find(3, 0L)));
-    }
-
-
-    private <T> int countItem(Iterator<T> iter) {
-        int i = 0;
-        while (iter.hasNext()) {
-            i++;
-            iter.next();
-        }
-        return i;
-    }
-}


Mime
View raw message