kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add Reduce functions
Date Thu, 21 Jan 2016 00:10:49 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
new file mode 100644
index 0000000..8c3b437
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes serdes;
+    private final Time time;
+
+    public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
new file mode 100644
index 0000000..8a600f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+    private static final int TTL_NOT_USED = -1;
+
+    // TODO: these values should be configurable
+    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+    private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+    private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+    private static final long BLOCK_SIZE = 4096L;
+    private static final int TTL_SECONDS = TTL_NOT_USED;
+    private static final int MAX_WRITE_BUFFERS = 3;
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private final String name;
+
+    private final Options options;
+    private final WriteOptions wOptions;
+    private final FlushOptions fOptions;
+
+    private Serdes<K, V> serdes;
+    private ProcessorContext context;
+    protected File dbDir;
+    private RocksDB db;
+
+    public RocksDBStore(String name, Serdes<K, V> serdes) {
+        this.name = name;
+        this.serdes = serdes;
+
+        // initialize the rocksdb options
+        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+        tableConfig.setBlockSize(BLOCK_SIZE);
+
+        options = new Options();
+        options.setTableFormatConfig(tableConfig);
+        options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+        options.setCompressionType(COMPRESSION_TYPE);
+        options.setCompactionStyle(COMPACTION_STYLE);
+        options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+        options.setCreateIfMissing(true);
+        options.setErrorIfExists(false);
+
+        wOptions = new WriteOptions();
+        wOptions.setDisableWAL(true);
+
+        fOptions = new FlushOptions();
+        fOptions.setWaitForFlush(true);
+    }
+
+    public void init(ProcessorContext context) {
+        serdes.init(context);
+
+        this.context = context;
+        this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
+        this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+    }
+
+    private RocksDB openDB(File dir, Options options, int ttl) {
+        try {
+            if (ttl == TTL_NOT_USED) {
+                dir.getParentFile().mkdirs();
+                return RocksDB.open(options, dir.toString());
+            } else {
+                throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
+                // TODO: support TTL with change log?
+                // return TtlDB.open(options, dir.toString(), ttl, false);
+            }
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public V get(K key) {
+        try {
+            return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        try {
+            if (value == null) {
+                db.remove(wOptions, serdes.rawKey(key));
+            } else {
+                db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+            }
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
+        }
+    }
+
+    @Override
+    public void putAll(List<Entry<K, V>> entries) {
+        for (Entry<K, V> entry : entries)
+            put(entry.key(), entry.value());
+    }
+
+    @Override
+    public V delete(K key) {
+        V value = get(key);
+        put(key, null);
+        return value;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        RocksIterator innerIter = db.newIterator();
+        innerIter.seekToFirst();
+        return new RocksDbIterator<K, V>(innerIter, serdes);
+    }
+
+    @Override
+    public void flush() {
+        try {
+            db.flush(fOptions);
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing flush from store " + this.name, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        flush();
+        db.close();
+    }
+
+    private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+        private final RocksIterator iter;
+        private final Serdes<K, V> serdes;
+
+        public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+            this.iter = iter;
+            this.serdes = serdes;
+        }
+
+        protected byte[] peekRawKey() {
+            return iter.key();
+        }
+
+        protected Entry<K, V> getEntry() {
+            return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.isValid();
+        }
+
+        @Override
+        public Entry<K, V> next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            Entry<K, V> entry = this.getEntry();
+            iter.next();
+            return entry;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+        }
+
+        @Override
+        public void close() {
+            iter.dispose();
+        }
+
+    }
+
+    private static class LexicographicComparator implements Comparator<byte[]> {
+
+        @Override
+        public int compare(byte[] left, byte[] right) {
+            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                int leftByte = left[i] & 0xff;
+                int rightByte = right[j] & 0xff;
+                if (leftByte != rightByte) {
+                    return leftByte - rightByte;
+                }
+            }
+            return left.length - right.length;
+        }
+    }
+
+    private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+        // RocksDB's JNI interface does not expose getters/setters that allow the
+        // comparator to be pluggable, and the default is lexicographic, so it's
+        // safe to just force lexicographic comparator here for now.
+        private final Comparator<byte[]> comparator = new LexicographicComparator();
+        byte[] rawToKey;
+
+        public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+                                    K from, K to) {
+            super(iter, serdes);
+            iter.seek(serdes.rawKey(from));
+            this.rawToKey = serdes.rawKey(to);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
new file mode 100644
index 0000000..933ed91
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.WindowStoreUtil;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SimpleTimeZone;
+
+public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
+
+    public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
+
+    private static final long USE_CURRENT_TIMESTAMP = -1L;
+
+    private static class Segment extends RocksDBStore<byte[], byte[]> {
+        public final long id;
+
+        Segment(String name, long id) {
+            super(name, WindowStoreUtil.INNER_SERDES);
+            this.id = id;
+        }
+
+        public void destroy() {
+            Utils.delete(dbDir);
+        }
+    }
+
+    private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
+        private final Serdes<?, V> serdes;
+        private final KeyValueIterator<byte[], byte[]>[] iterators;
+        private int index = 0;
+
+        RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
+            this(serdes, WindowStoreUtil.NO_ITERATORS);
+        }
+
+        RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+            this.serdes = serdes;
+            this.iterators = iterators;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (index < iterators.length) {
+                if (iterators[index].hasNext())
+                    return true;
+
+                index++;
+            }
+            return false;
+        }
+
+        @Override
+        public KeyValue<Long, V> next() {
+            if (index >= iterators.length)
+                throw new NoSuchElementException();
+
+            Entry<byte[], byte[]> entry = iterators[index].next();
+
+            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
+                                  serdes.valueFrom(entry.value()));
+        }
+
+        @Override
+        public void remove() {
+            if (index < iterators.length)
+                iterators[index].remove();
+        }
+
+        @Override
+        public void close() {
+            for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    private final String name;
+    private final long segmentInterval;
+    private final boolean retainDuplicates;
+    private final Segment[] segments;
+    private final Serdes<K, V> serdes;
+    private final SimpleDateFormat formatter;
+
+    private ProcessorContext context;
+    private long currentSegmentId = -1L;
+    private int seqnum = 0;
+
+    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
+        this.name = name;
+
+        // The segment interval must be greater than MIN_SEGMENT_INTERVAL
+        this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+
+        this.segments = new Segment[numSegments];
+        this.serdes = serdes;
+
+        this.retainDuplicates = retainDuplicates;
+
+        // Create a date formatter. Formatted timestamps are used as segment name suffixes
+        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
+        this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public void flush() {
+        for (Segment segment : segments) {
+            if (segment != null)
+                segment.flush();
+        }
+    }
+
+    @Override
+    public void close() {
+        for (Segment segment : segments) {
+            if (segment != null)
+                segment.close();
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
+    }
+
+    @Override
+    public void put(K key, V value, long timestamp) {
+        putAndReturnInternalKey(key, value, timestamp);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value, long t) {
+        long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
+
+        long segmentId = segmentId(timestamp);
+
+        if (segmentId > currentSegmentId) {
+            // A new segment will be created. Clean up old segments first.
+            currentSegmentId = segmentId;
+            cleanup();
+        }
+
+        // If the record is within the retention period, put it in the store.
+        if (segmentId > currentSegmentId - segments.length) {
+            if (retainDuplicates)
+                seqnum = (seqnum + 1) & 0x7FFFFFFF;
+            byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
+            getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
+            return binaryKey;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+        if (segmentId > currentSegmentId) {
+            // A new segment will be created. Clean up old segments first.
+            currentSegmentId = segmentId;
+            cleanup();
+        }
+
+        // If the record is within the retention period, put it in the store.
+        if (segmentId > currentSegmentId - segments.length)
+            getSegment(segmentId).put(binaryKey, binaryValue);
+    }
+
+    @Override
+    public byte[] getInternal(byte[] binaryKey) {
+        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+        Segment segment = segments[(int) (segmentId % segments.length)];
+
+        if (segment != null && segment.id == segmentId) {
+            return segment.get(binaryKey);
+        } else {
+            return null;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+        long segFrom = segmentId(timeFrom);
+        long segTo = segmentId(Math.max(0L, timeTo));
+
+        byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
+        byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
+
+        ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
+
+        for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
+            Segment segment = segments[(int) (segmentId % segments.length)];
+
+            if (segment != null && segment.id == segmentId)
+                iterators.add(segment.range(binaryFrom, binaryUntil));
+        }
+
+        if (iterators.size() > 0) {
+            return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
+        } else {
+            return new RocksDBWindowStoreIterator<>(serdes);
+        }
+    }
+
+    private Segment getSegment(long segmentId) {
+        int index = (int) (segmentId % segments.length);
+
+        if (segments[index] == null) {
+            segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
+            segments[index].init(context);
+        }
+
+        return segments[index];
+    }
+
+    private void cleanup() {
+        for (int i = 0; i < segments.length; i++) {
+            if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
+                segments[i].close();
+                segments[i].destroy();
+                segments[i] = null;
+            }
+        }
+    }
+
+    public long segmentId(long timestamp) {
+        return timestamp / segmentInterval;
+    }
+
+    public String directorySuffix(long segmentId) {
+        return formatter.format(new Date(segmentId * segmentInterval));
+    }
+
+    // this method is used by a test
+    public Set<Long> segmentIds() {
+        HashSet<Long> segmentIds = new HashSet<>();
+
+        for (Segment segment : segments) {
+            if (segment != null)
+                segmentIds.add(segment.id);
+        }
+
+        return segmentIds;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
new file mode 100644
index 0000000..fa85ce9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final long retentionPeriod;
+    private final boolean retainDuplicates;
+    private final int numSegments;
+    private final Serdes serdes;
+    private final Time time;
+
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.retentionPeriod = retentionPeriod;
+        this.retainDuplicates = retainDuplicates;
+        this.numSegments = numSegments;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
new file mode 100644
index 0000000..da5544c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StoreChangeLogger<K, V> {
+
+    public interface ValueGetter<K, V> {
+        V get(K key);
+    }
+
+    protected final Serdes<K, V> serialization;
+
+    private final Set<K> dirty;
+    private final Set<K> removed;
+    private final int maxDirty;
+    private final int maxRemoved;
+
+    private final String topic;
+    private int partition;
+    private ProcessorContext context;
+
+    // always wrap the logged store with the metered store
+    public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+        this.topic = topic;
+        this.serialization = serialization;
+        this.context = context;
+        this.partition = context.id().partition;
+
+        this.dirty = new HashSet<>();
+        this.removed = new HashSet<>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
+    }
+
+    public void add(K key) {
+        this.dirty.add(key);
+        this.removed.remove(key);
+    }
+
+    public void delete(K key) {
+        this.dirty.remove(key);
+        this.removed.add(key);
+    }
+
+    public void maybeLogChange(ValueGetter<K, V> getter) {
+        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+            logChange(getter);
+    }
+
+    public void logChange(ValueGetter<K, V> getter) {
+        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+        if (collector != null) {
+            Serializer<K> keySerializer = serialization.keySerializer();
+            Serializer<V> valueSerializer = serialization.valueSerializer();
+
+            for (K k : this.removed) {
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+            }
+            for (K k : this.dirty) {
+                V v = getter.get(k);
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+            }
+            this.removed.clear();
+            this.dirty.clear();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
index ba596a9..ecc303d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
 import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -43,34 +42,21 @@ public class KStreamAggregateTest {
     private final Serializer<String> strSerializer = new StringSerializer();
     private final Deserializer<String> strDeserializer = new StringDeserializer();
 
-    private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+    private class StringCanonizer implements Aggregator<String, String, String> {
 
-        private class StringCanonizer implements Aggregator<String, String, String> {
-
-            @Override
-            public String initialValue() {
-                return "0";
-            }
-
-            @Override
-            public String add(String aggKey, String value, String aggregate) {
-                return aggregate + "+" + value;
-            }
-
-            @Override
-            public String remove(String aggKey, String value, String aggregate) {
-                return aggregate + "-" + value;
-            }
+        @Override
+        public String initialValue(String aggKey) {
+            return "0";
+        }
 
-            @Override
-            public String merge(String aggr1, String aggr2) {
-                return "(" + aggr1 + ") + (" + aggr2 + ")";
-            }
+        @Override
+        public String add(String aggKey, String value, String aggregate) {
+            return aggregate + "+" + value;
         }
 
         @Override
-        public Aggregator<String, String, String> get() {
-            return new StringCanonizer();
+        public String remove(String aggKey, String value, String aggregate) {
+            return aggregate + "-" + value;
         }
     }
 
@@ -83,7 +69,7 @@ public class KStreamAggregateTest {
             String topic1 = "topic1";
 
             KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
-            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizeSupplier(),
+            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizer(),
                     HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
                     strSerializer,
                     strSerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index b5037ee..439aa09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -41,34 +40,21 @@ public class KTableAggregateTest {
     private final Serializer<String> strSerializer = new StringSerializer();
     private final Deserializer<String> strDeserializer = new StringDeserializer();
 
-    private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+    private class StringCanonizer implements Aggregator<String, String, String> {
 
-        private class StringCanonizer implements Aggregator<String, String, String> {
-
-            @Override
-            public String initialValue() {
-                return "0";
-            }
-
-            @Override
-            public String add(String aggKey, String value, String aggregate) {
-                return aggregate + "+" + value;
-            }
-
-            @Override
-            public String remove(String aggKey, String value, String aggregate) {
-                return aggregate + "-" + value;
-            }
+        @Override
+        public String initialValue(String aggKey) {
+            return "0";
+        }
 
-            @Override
-            public String merge(String aggr1, String aggr2) {
-                return "(" + aggr1 + ") + (" + aggr2 + ")";
-            }
+        @Override
+        public String add(String aggKey, String value, String aggregate) {
+            return aggregate + "+" + value;
         }
 
         @Override
-        public Aggregator<String, String, String> get() {
-            return new StringCanonizer();
+        public String remove(String aggKey, String value, String aggregate) {
+            return aggregate + "-" + value;
         }
     }
 
@@ -81,7 +67,7 @@ public class KTableAggregateTest {
             String topic1 = "topic1";
 
             KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
-            KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(),
+            KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizer(),
                     new NoOpKeyValueMapper<String, String>(),
                     strSerializer,
                     strSerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 5e336cc..bc6f71b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b2f45fd..85a8a15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
deleted file mode 100644
index d40f308..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
-
-public abstract class AbstractKeyValueStoreTest {
-
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
-                                                                      Class<K> keyClass, Class<V> valueClass,
-                                                                      boolean useContextSerdes);
-
-    @Test
-    public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-                while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
-            }
-
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-                while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
-            }
-        } finally {
-            store.close();
-        }
-    }
-
-    @Test
-    public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-        } finally {
-            store.close();
-        }
-    }
-
-    @Test
-    public void testRestore() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(0, "zero");
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
-    }
-
-    @Test
-    public void testRestoreWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(0, "zero");
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
deleted file mode 100644
index 2b90d0a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            ProcessorContext context,
-            Class<K> keyClass, Class<V> valueClass,
-            boolean useContextSerdes) {
-
-        StateStoreSupplier supplier;
-        if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
-        } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
-        }
-
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
-        return store;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
deleted file mode 100644
index 81adfad..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.junit.Test;
-
-public class InMemoryLRUCacheStoreTest {
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        StateStoreSupplier supplier = Stores.create("my-store")
-                                                     .withIntegerKeys().withStringValues()
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
-        store.put(0, "zero");
-        store.put(1, "one");
-        store.put(2, "two");
-        store.put(3, "three");
-        store.put(4, "four");
-        store.put(5, "five");
-
-        // It should only keep the last 4 added ...
-        assertEquals(3, driver.sizeOf(store));
-        assertNull(store.get(0));
-        assertNull(store.get(1));
-        assertNull(store.get(2));
-        assertEquals("three", store.get(3));
-        assertEquals("four", store.get(4));
-        assertEquals("five", store.get(5));
-        store.delete(5);
-
-        // Flush the store and verify all current entries were properly flushed ...
-        store.flush();
-        assertNull(driver.flushedEntryStored(0));
-        assertNull(driver.flushedEntryStored(1));
-        assertNull(driver.flushedEntryStored(2));
-        assertEquals("three", driver.flushedEntryStored(3));
-        assertEquals("four", driver.flushedEntryStored(4));
-        assertNull(driver.flushedEntryStored(5));
-
-        assertEquals(true, driver.flushedEntryRemoved(0));
-        assertEquals(true, driver.flushedEntryRemoved(1));
-        assertEquals(true, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(3));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-
-        Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
-        Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
-        Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
-        Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
-        StateStoreSupplier supplier = Stores.create("my-store")
-                                                     .withKeys(keySer, keyDeser)
-                                                     .withValues(valSer, valDeser)
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
-        store.put(0, "zero");
-        store.put(1, "one");
-        store.put(2, "two");
-        store.put(3, "three");
-        store.put(4, "four");
-        store.put(5, "five");
-
-        // It should only keep the last 4 added ...
-        assertEquals(3, driver.sizeOf(store));
-        assertNull(store.get(0));
-        assertNull(store.get(1));
-        assertNull(store.get(2));
-        assertEquals("three", store.get(3));
-        assertEquals("four", store.get(4));
-        assertEquals("five", store.get(5));
-        store.delete(5);
-
-        // Flush the store and verify all current entries were properly flushed ...
-        store.flush();
-        assertNull(driver.flushedEntryStored(0));
-        assertNull(driver.flushedEntryStored(1));
-        assertNull(driver.flushedEntryStored(2));
-        assertEquals("three", driver.flushedEntryStored(3));
-        assertEquals("four", driver.flushedEntryStored(4));
-        assertNull(driver.flushedEntryStored(5));
-
-        assertEquals(true, driver.flushedEntryRemoved(0));
-        assertEquals(true, driver.flushedEntryRemoved(1));
-        assertEquals(true, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(3));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
-    }
-
-    @Test
-    public void testRestore() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        StateStoreSupplier supplier = Stores.create("my-store")
-                                                     .withIntegerKeys().withStringValues()
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store's contents were properly restored ...
-        assertEquals(0, driver.checkForRestoredEntries(store));
-
-        // and there are no other entries ...
-        assertEquals(3, driver.sizeOf(store));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 108797a..b0c9bd7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -48,7 +48,7 @@ import java.util.Set;
  * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
  * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
  * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
- * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
+ * {@link org.apache.kafka.streams.state.internals.MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
  * <p>
  * <h2>Basic usage</h2>
  * This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
@@ -93,7 +93,7 @@ import java.util.Set;
  * <p>
  * <h2>Restoring a store</h2>
  * This component can be used to test whether a {@link KeyValueStore} implementation properly
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
+ * {@link ProcessorContext#register(StateStore, boolean, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
  * the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
  * <p>
  * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
deleted file mode 100644
index 20e92ef..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            ProcessorContext context,
-            Class<K> keyClass,
-            Class<V> valueClass,
-            boolean useContextSerdes) {
-
-        StateStoreSupplier supplier;
-        if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
-        } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
-        }
-
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
-        return store;
-
-    }
-}


Mime
View raw message