kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/5] kafka git commit: KAFKA-3452: Support session windows
Date Fri, 06 Jan 2017 18:12:36 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b563137..a2a420e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -21,159 +21,34 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.SimpleTimeZone;
-import java.util.concurrent.ConcurrentHashMap;
 
-public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
-
-    public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
-
-    private volatile boolean open = false;
-
-    // use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
-    private static class Segment extends RocksDBStore<Bytes, byte[]> {
-        public final long id;
-
-        Segment(String segmentName, String windowName, long id) {
-            super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
-            this.id = id;
-        }
-
-        public void destroy() {
-            Utils.delete(dbDir);
-        }
-
-        @Override
-        public void openDB(final ProcessorContext context) {
-            super.openDB(context);
-            open = true;
-        }
-    }
-
-    private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
-        private final StateSerdes<?, V> serdes;
-        private final Iterator<Segment> segments;
-        private final Bytes from;
-        private final Bytes to;
-        private KeyValueIterator<Bytes, byte[]> currentIterator;
-        private KeyValueStore<Bytes, byte[]> currentSegment;
-
-        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
-            this(serdes, null, null, Collections.<Segment>emptyIterator());
-        }
-
-        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, Bytes from, Bytes to, Iterator<Segment> segments) {
-            this.serdes = serdes;
-            this.from = from;
-            this.to = to;
-            this.segments = segments;
-        }
-
-        @Override
-        public boolean hasNext() {
-            while ((currentIterator == null || !currentIterator.hasNext() || !currentSegment.isOpen())
-                    && segments.hasNext()) {
-                close();
-                currentSegment = segments.next();
-                try {
-                    currentIterator = currentSegment.range(from, to);
-                } catch (InvalidStateStoreException e) {
-                    // segment may have been closed so we ignore it.
-                }
-            }
-            return currentIterator != null && currentIterator.hasNext();
-        }
-
-        /**
-         * @throws NoSuchElementException if no next element exists
-         */
-        @Override
-        public KeyValue<Long, V> next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            KeyValue<Bytes, byte[]> kv = currentIterator.next();
-            return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key.get()),
-                    serdes.valueFrom(kv.value));
-        }
-
-        @Override
-        public void remove() {
-
-        }
-
-        @Override
-        public void close() {
-            if (currentIterator != null) {
-                currentIterator.close();
-                currentIterator = null;
-            }
-        }
-    }
+class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     private final String name;
-    private final int numSegments;
-    private final long segmentInterval;
+    private final SegmentedBytesStore bytesStore;
     private final boolean retainDuplicates;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
-    private final SimpleDateFormat formatter;
-    private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
-
     private ProcessorContext context;
     private int seqnum = 0;
-    private long currentSegmentId = -1L;
-
     private StateSerdes<K, V> serdes;
 
-    private boolean loggingEnabled = false;
-    private StoreChangeLogger<Bytes, byte[]> changeLogger = null;
 
-    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
+    RocksDBWindowStore(String name, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, final SegmentedBytesStore bytesStore) {
         this.name = name;
-        this.numSegments = numSegments;
-
-        // The segment interval must be greater than MIN_SEGMENT_INTERVAL
-        this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
-
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
-
         this.retainDuplicates = retainDuplicates;
-
-
-        // Create a date formatter. Formatted timestamps are used as segment name suffixes
-        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
-        this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
-    }
-
-    public RocksDBWindowStore<K, V> enableLogging() {
-        loggingEnabled = true;
-        return this;
+        this.bytesStore = bytesStore;
     }
 
 
@@ -184,59 +59,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void init(ProcessorContext context, StateStore root) {
+    public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
-
         // construct the serde
         this.serdes = new StateSerdes<>(name,
-                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
-        openExistingSegments();
-
-        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger(name, context, WindowStoreUtils.INNER_SERDES) : null;
-
-        // register and possibly restore the state from the logs
-        context.register(root, loggingEnabled, new StateRestoreCallback() {
-            @Override
-            public void restore(byte[] key, byte[] value) {
-                // if the value is null, it means that this record has already been
-                // deleted while it was captured in the changelog, hence we do not need to put any more.
-                if (value != null)
-                    putInternal(key, value);
-            }
-        });
-
-        flush();
-        open = true;
-    }
-
-    private void openExistingSegments() {
-        try {
-            File dir = new File(context.stateDir(), name);
+                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
-            if (dir.exists()) {
-                String[] list = dir.list();
-                if (list != null) {
-                    long[] segmentIds = new long[list.length];
-                    for (int i = 0; i < list.length; i++)
-                        segmentIds[i] = segmentIdFromSegmentName(list[i]);
-
-                    // open segments in the id order
-                    Arrays.sort(segmentIds);
-                    for (long segmentId : segmentIds) {
-                        if (segmentId >= 0) {
-                            currentSegmentId = segmentId;
-                            getOrCreateSegment(segmentId);
-                        }
-                    }
-                }
-            } else {
-                dir.mkdir();
-            }
-        } catch (Exception ex) {
-            // ignore
-        }
+        bytesStore.init(context, root);
     }
 
     @Override
@@ -246,26 +76,17 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public boolean isOpen() {
-        return open;
+        return bytesStore.isOpen();
     }
 
     @Override
     public void flush() {
-        for (KeyValueStore<Bytes, byte[]> segment : segments.values()) {
-            if (segment != null) {
-                segment.flush();
-            }
-        }
+        bytesStore.flush();
     }
 
     @Override
     public void close() {
-        open = false;
-        flush();
-        for (KeyValueStore segment : segments.values()) {
-            if (segment != null)
-                segment.close();
-        }
+        bytesStore.close();
     }
 
     @Override
@@ -275,166 +96,57 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value, long timestamp) {
-        final byte[] rawValue = serdes.rawValue(value);
-        byte[] rawKey = putAndReturnInternalKey(key, rawValue, timestamp);
-
-        if (rawKey != null && loggingEnabled) {
-            changeLogger.logChange(Bytes.wrap(rawKey), rawValue);
-        }
-    }
-
-    private byte[] putAndReturnInternalKey(K key, byte[] value, long timestamp) {
-        long segmentId = segmentId(timestamp);
-
-        if (segmentId > currentSegmentId) {
-            // A new segment will be created. Clean up old segments first.
-            currentSegmentId = segmentId;
-            cleanup();
-        }
-
-        // If the record is within the retention period, put it in the store.
-        KeyValueStore<Bytes, byte[]> segment = getOrCreateSegment(segmentId);
-        if (segment != null) {
-            if (retainDuplicates)
-                seqnum = (seqnum + 1) & 0x7FFFFFFF;
-            byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
-            segment.put(Bytes.wrap(binaryKey), value);
-            return binaryKey;
-        } else {
-            return null;
-        }
-    }
-
-    private void putInternal(byte[] binaryKey, byte[] binaryValue) {
-        final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
-        long segmentId = segmentId(timestamp);
-
-        if (segmentId > currentSegmentId) {
-            // A new segment will be created. Clean up old segments first.
-            currentSegmentId = segmentId;
-            cleanup();
-        }
-
-        // If the record is within the retention period, put it in the store.
-        Segment segment = getOrCreateSegment(segmentId);
-        if (segment != null) {
-            segment.writeToStore(Bytes.wrap(binaryKey), binaryValue);
+        if (retainDuplicates) {
+            seqnum = (seqnum + 1) & 0x7FFFFFFF;
         }
+        bytesStore.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes)), serdes.rawValue(value));
     }
 
-    private byte[] getInternal(byte[] binaryKey) {
-        long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
-
-        KeyValueStore<Bytes, byte[]> segment = getSegment(segmentId);
-        if (segment != null) {
-            return segment.get(Bytes.wrap(binaryKey));
-        } else {
-            return null;
-        }
-    }
 
     @SuppressWarnings("unchecked")
     @Override
     public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
-        if (!isOpen()) {
-            throw new InvalidStateStoreException("Store " + this.name + " is currently not isOpen");
-        }
-        long segFrom = segmentId(timeFrom);
-        long segTo = segmentId(Math.max(0L, timeTo));
-
-        byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
-        byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
-
-        final List<Segment> segments = new ArrayList<>();
-        for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
-            Segment segment = getSegment(segmentId);
-            if (segment != null && segment.isOpen()) {
-                try {
-                    segments.add(segment);
-                } catch (InvalidStateStoreException ise) {
-                    // segment may have been closed by streams thread;
-                }
-            }
-        }
-
-        if (!segments.isEmpty()) {
-            return new RocksDBWindowStoreIterator<>(serdes, Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo), segments.iterator());
-        } else {
-            return new RocksDBWindowStoreIterator<>(serdes);
-        }
-    }
-
-    private Segment getSegment(long segmentId) {
-        final Segment segment = segments.get(segmentId % numSegments);
-        if (!isSegment(segment, segmentId)) {
-            return null;
-        }
-        return segment;
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+        return new TheWindowStoreIterator<>(bytesIterator, serdes);
     }
 
-    private boolean isSegment(final Segment store, long segmentId) {
-        return store != null && store.id == segmentId;
-    }
+    private static class TheWindowStoreIterator<V> implements WindowStoreIterator<V> {
+        private final KeyValueIterator<Bytes, byte[]> actual;
+        private final StateSerdes<?, V> serdes;
 
-    private Segment getOrCreateSegment(long segmentId) {
-        if (segmentId <= currentSegmentId && segmentId > currentSegmentId - numSegments) {
-            final long key = segmentId % numSegments;
-            final Segment segment = segments.get(key);
-            if (!isSegment(segment, segmentId)) {
-                cleanup();
-            }
-            if (!segments.containsKey(key)) {
-                Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
-                newSegment.openDB(context);
-                segments.put(key, newSegment);
-            }
-            return segments.get(key);
+        TheWindowStoreIterator(final KeyValueIterator<Bytes, byte[]> actual, final StateSerdes<?, V> serdes) {
+            this.actual = actual;
+            this.serdes = serdes;
+        }
 
-        } else {
-            return null;
+        @Override
+        public boolean hasNext() {
+            return actual.hasNext();
         }
-    }
 
-    private void cleanup() {
-        for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
-            final Segment segment = segmentEntry.getValue();
-            if (segment != null && segment.id <= currentSegmentId - numSegments) {
-                segments.remove(segmentEntry.getKey());
-                segment.close();
-                segment.destroy();
+        /**
+         * @throws NoSuchElementException if no next element exists
+         */
+        @Override
+        public KeyValue<Long, V> next() {
+            if (!actual.hasNext()) {
+                throw new NoSuchElementException();
             }
+            final KeyValue<Bytes, byte[]> next = actual.next();
+            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+            final V value = serdes.valueFrom(next.value);
+            return KeyValue.pair(timestamp, value);
         }
-    }
-
-    private long segmentId(long timestamp) {
-        return timestamp / segmentInterval;
-    }
-
-    // this method is defined public since it is used for unit tests
-    public String segmentName(long segmentId) {
-        return name + "-" + formatter.format(new Date(segmentId * segmentInterval));
-    }
 
-    public long segmentIdFromSegmentName(String segmentName) {
-        try {
-            Date date = formatter.parse(segmentName.substring(name.length() + 1));
-            return date.getTime() / segmentInterval;
-        } catch (Exception ex) {
-            return -1L;
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
         }
-    }
 
-    // this method is defined public since it is used for unit tests
-    public Set<Long> segmentIds() {
-        HashSet<Long> segmentIds = new HashSet<>();
-
-        for (Segment segment : segments.values()) {
-            if (segment != null) {
-                segmentIds.add(segment.id);
-            }
+        @Override
+        public void close() {
+            actual.close();
         }
-
-        return segmentIds;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 49d8882..84f1734 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -18,8 +18,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.WindowStore;
 
@@ -34,7 +32,7 @@ import java.util.Map;
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 
-public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> {
+public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
 
     private final long retentionPeriod;
     private final boolean retainDuplicates;
@@ -60,15 +58,22 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
     }
 
     public WindowStore get() {
+        final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(name, retentionPeriod, numSegments, new WindowStoreKeySchema());
         if (!enableCaching) {
-            final RocksDBWindowStore<K, V> rocksDbStore = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
-            return new MeteredWindowStore<>(logged ? rocksDbStore.enableLogging() : rocksDbStore, "rocksdb-window", time);
+            final RocksDBWindowStore<K, V> segmentedStore = new RocksDBWindowStore<>(name, retainDuplicates, keySerde, valueSerde,
+                                                                                     logged ? new ChangeLoggingSegmentedBytesStore(bytesStore)
+                                                                                                   : bytesStore);
+            return new MeteredWindowStore<>(segmentedStore, "rocksdb-window", time);
         }
 
-        final RocksDBWindowStore<Bytes, byte[]> store = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, false, Serdes.Bytes(), Serdes.ByteArray());
-        return new CachingWindowStore<>(new MeteredWindowStore<>(logged ? store.enableLogging() : store, "rocksdb-window", time), keySerde, valueSerde, windowSize);
+        return new CachingWindowStore<>(new MeteredSegmentedBytesStore(logged ? new ChangeLoggingSegmentedBytesStore(bytesStore)
+                                                                               : bytesStore,
+                                                                       "rocksdb-window",
+                                                                       time),
+                                        keySerde, valueSerde, windowSize);
     }
 
+    @Override
     public long retentionPeriod() {
         return retentionPeriod;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
new file mode 100644
index 0000000..5471262
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+// use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
+class Segment extends RocksDBStore<Bytes, byte[]> {
+    public final long id;
+
+    Segment(String segmentName, String windowName, long id) {
+        super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
+        this.id = id;
+    }
+
+    public void destroy() {
+        Utils.delete(dbDir);
+    }
+
+    @Override
+    public void openDB(final ProcessorContext context) {
+        super.openDB(context);
+        open = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
new file mode 100644
index 0000000..ed2da9c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterate over multiple Segments
+ * @param <K>
+ * @param <V>
+ */
+class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
+
+    private final Iterator<Segment> segments;
+    private final HasNextCondition hasNextCondition;
+    private final Bytes from;
+    private final Bytes to;
+    private KeyValueIterator<Bytes, byte[]> currentIterator;
+    private KeyValueStore<Bytes, byte[]> currentSegment;
+
+    SegmentIterator(final Iterator<Segment> segments,
+                    final HasNextCondition hasNextCondition,
+                    final Bytes from,
+                    final Bytes to) {
+        this.segments = segments;
+        this.hasNextCondition = hasNextCondition;
+        this.from = from;
+        this.to = to;
+    }
+
+    public void close() {
+        if (currentIterator != null) {
+            currentIterator.close();
+            currentIterator = null;
+        }
+    }
+
+    @Override
+    public Bytes peekNextKey() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return currentIterator.peekNextKey();
+    }
+
+    @SuppressWarnings("unchecked")
+    public boolean hasNext() {
+        boolean hasNext = false;
+        while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
+                && segments.hasNext()) {
+            close();
+            currentSegment = segments.next();
+            try {
+                currentIterator = currentSegment.range(from, to);
+            } catch (InvalidStateStoreException e) {
+                // segment may have been closed so we ignore it.
+            }
+        }
+        return currentIterator != null && hasNext;
+    }
+
+    public KeyValue<Bytes, byte[]> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return currentIterator.next();
+    }
+
+    public void remove() {
+        throw new UnsupportedOperationException("remove not supported");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
new file mode 100644
index 0000000..ab1099e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+/**
+ * The interface representing a StateStore that has 1 or more segments that are based
+ * on time.
+ * @see RocksDBSegmentedBytesStore
+ */
+public interface SegmentedBytesStore extends StateStore {
+
+    /**
+     * Fetch all records from the segmented store with the provided key and time range
+     * from all existing segments
+     * @param key       the key to match
+     * @param from      earliest time to match
+     * @param to        latest time to match
+     * @return  an iterator over key-value pairs
+     */
+    KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to);
+
+    /**
+     * Remove the record with the provided key. The key
+     * should be a composite of the record key, and the timestamp information etc
+     * as described by the {@link KeySchema}
+     * @param key   the segmented key to remove
+     */
+    void remove(Bytes key);
+
+    /**
+     * Write a new value to the store with the provided key. The key
+     * should be a composite of the record key, and the timestamp information etc
+     * as described by the {@link KeySchema}
+     * @param key
+     * @param value
+     */
+    void put(Bytes key, byte[] value);
+
+    /**
+     * Get the record from the store with the given key. The key
+     * should be a composite of the record key, and the timestamp information etc
+     * as described by the {@link KeySchema}
+     * @param key
+     * @return
+     */
+    byte[] get(Bytes key);
+
+
+    interface KeySchema {
+        /**
+         * Given a record-key and a time, construct a Segmented key that represents
+         * the upper range of keys to search when performing range queries.
+         * @see SessionKeySchema#upperRange
+         * @see WindowStoreKeySchema#upperRange
+         * @param key
+         * @param to
+         * @return      The key that represents the upper range to search for in the store
+         */
+        Bytes upperRange(final Bytes key, final long to);
+
+        /**
+         * Given a record-key and a time, construct a Segmented key that represents
+         * the lower range of keys to search when performing range queries.
+         * @see SessionKeySchema#lowerRange
+         * @see WindowStoreKeySchema#lowerRange
+         * @param key
+         * @param from
+         * @return      The key that represents the lower range to search for in the store
+         */
+        Bytes lowerRange(final Bytes key, final long from);
+
+        /**
+         * Extract the timestamp of the segment from the key. The key is a composite of
+         * the record-key, any timestamps, plus any additional information.
+         * @see SessionKeySchema#lowerRange
+         * @see WindowStoreKeySchema#lowerRange
+         * @param key
+         * @return
+         */
+        long segmentTimestamp(final Bytes key);
+
+        /**
+         * Create an implementation of {@link HasNextCondition} that knows when
+         * to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, long, long)} operations
+         * @param binaryKey     the record-key
+         * @param from          starting time range
+         * @param to            ending time range
+         * @return
+         */
+        HasNextCondition hasNextCondition(final Bytes binaryKey, long from, long to);
+
+        /**
+         * Used during {@link SegmentedBytesStore#fetch(Bytes, long, long)} operations to determine
+         * which segments should be scanned.
+         * @param segments
+         * @param from
+         * @param to
+         * @return  List of segments to search
+         */
+        List<Segment> segmentsToSearch(Segments segments, long from, long to);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
new file mode 100644
index 0000000..13cdf6c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.SimpleTimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore}
+ */
+class Segments {
+    static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
+    private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
+    private final String name;
+    private final int numSegments;
+    private final long segmentInterval;
+    private final SimpleDateFormat formatter;
+    private long currentSegmentId = -1L;
+
+    Segments(final String name, final long retentionPeriod, final int numSegments) {
+        this.name = name;
+        this.numSegments = numSegments;
+        this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+        // Create a date formatter. Formatted timestamps are used as segment name suffixes
+        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
+        this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
+    }
+
+    long segmentId(long timestamp) {
+        return timestamp / segmentInterval;
+    }
+
+    String segmentName(long segmentId) {
+        return name + "-" + formatter.format(new Date(segmentId * segmentInterval));
+    }
+
+    Segment getSegmentForTimestamp(final long timestamp) {
+        return getSegment(segmentId(timestamp));
+    }
+
+    Segment getOrCreateSegment(final long segmentId, final ProcessorContext context) {
+        if (segmentId > currentSegmentId || segmentId > currentSegmentId - numSegments) {
+            final long key = segmentId % numSegments;
+            final Segment segment = segments.get(key);
+            if (!isSegment(segment, segmentId)) {
+                cleanup(segmentId);
+            }
+            if (!segments.containsKey(key)) {
+                Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
+                newSegment.openDB(context);
+                segments.put(key, newSegment);
+                currentSegmentId = segmentId > currentSegmentId ? segmentId : currentSegmentId;
+            }
+            return segments.get(key);
+        } else {
+            return null;
+        }
+    }
+
+
+    void openExisting(final ProcessorContext context) {
+        try {
+            File dir = new File(context.stateDir(), name);
+            if (dir.exists()) {
+                String[] list = dir.list();
+                if (list != null) {
+                    long[] segmentIds = new long[list.length];
+                    for (int i = 0; i < list.length; i++)
+                        segmentIds[i] = segmentIdFromSegmentName(list[i]);
+
+                    // open segments in the id order
+                    Arrays.sort(segmentIds);
+                    for (long segmentId : segmentIds) {
+                        if (segmentId >= 0) {
+                            getOrCreateSegment(segmentId, context);
+                        }
+                    }
+                }
+            } else {
+                if (!dir.mkdir()) {
+                    throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for store %s", dir, name));
+                }
+            }
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    List<Segment> segments(final long timeFrom, final long timeTo) {
+        final long segFrom = segmentId(Math.max(0L, timeFrom));
+        final long segTo = segmentId(Math.min(currentSegmentId * segmentInterval, Math.max(0, timeTo)));
+
+        final List<Segment> segments = new ArrayList<>();
+        for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
+            Segment segment = getSegment(segmentId);
+            if (segment != null && segment.isOpen()) {
+                try {
+                    segments.add(segment);
+                } catch (InvalidStateStoreException ise) {
+                    // segment may have been closed by streams thread;
+                }
+            }
+        }
+        return segments;
+    }
+
+    void flush() {
+        for (Segment segment : segments.values()) {
+            segment.flush();
+        }
+    }
+
+    public void close() {
+        for (Segment segment : segments.values()) {
+            segment.close();
+        }
+    }
+
+    private Segment getSegment(long segmentId) {
+        final Segment segment = segments.get(segmentId % numSegments);
+        if (!isSegment(segment, segmentId)) {
+            return null;
+        }
+        return segment;
+    }
+
+    private boolean isSegment(final Segment store, long segmentId) {
+        return store != null && store.id == segmentId;
+    }
+
+    private void cleanup(final long segmentId) {
+        final long oldestSegmentId = currentSegmentId < segmentId
+                ? segmentId - numSegments
+                : currentSegmentId - numSegments;
+
+        for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
+            final Segment segment = segmentEntry.getValue();
+            if (segment != null && segment.id <= oldestSegmentId) {
+                segments.remove(segmentEntry.getKey());
+                segment.close();
+                segment.destroy();
+            }
+        }
+    }
+
+    private long segmentIdFromSegmentName(String segmentName) {
+        try {
+            Date date = formatter.parse(segmentName.substring(name.length() + 1));
+            return date.getTime() / segmentInterval;
+        } catch (Exception ex) {
+            return -1L;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
new file mode 100644
index 0000000..b15eec9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+
+class SessionKeySchema implements SegmentedBytesStore.KeySchema {
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        final Windowed<Bytes> sessionKey = new Windowed<>(key, new TimeWindow(to, Long.MAX_VALUE));
+        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        final Windowed<Bytes> sessionKey = new Windowed<>(key, new TimeWindow(0, Math.max(0, from)));
+        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
+    }
+
+    @Override
+    public long segmentTimestamp(final Bytes key) {
+        return SessionKeySerde.extractEnd(key.get());
+    }
+
+    @Override
+    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+        return new HasNextCondition() {
+            @Override
+            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+                if (iterator.hasNext()) {
+                    final Bytes bytes = iterator.peekNextKey();
+                    final Bytes keyBytes = Bytes.wrap(SessionKeySerde.extractKeyBytes(bytes.get()));
+                    if (!keyBytes.equals(binaryKey)) {
+                        return false;
+                    }
+                    final long start = SessionKeySerde.extractStart(bytes.get());
+                    final long end = SessionKeySerde.extractEnd(bytes.get());
+                    return end >= from && start <= to;
+                }
+                return false;
+            }
+        };
+    }
+
+    @Override
+    public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
+        return segments.segments(from, Long.MAX_VALUE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index c6c3030..a66c961 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -234,24 +234,25 @@ public class ThreadCache {
     }
 
 
-    static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<byte[], LRUCacheEntry> {
+    static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
         private final Iterator<Bytes> keys;
         private final NamedCache cache;
-        private KeyValue<byte[], LRUCacheEntry> nextEntry;
+        private KeyValue<Bytes, LRUCacheEntry> nextEntry;
 
         MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) {
             this.keys = keys;
             this.cache = cache;
         }
 
-        public byte[] peekNextKey() {
+        public Bytes peekNextKey() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
             return nextEntry.key;
         }
 
-        KeyValue<byte[], LRUCacheEntry> peekNext() {
+
+        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
@@ -272,11 +273,11 @@ public class ThreadCache {
         }
 
         @Override
-        public KeyValue<byte[], LRUCacheEntry> next() {
+        public KeyValue<Bytes, LRUCacheEntry> next() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
-            final KeyValue<byte[], LRUCacheEntry> result = nextEntry;
+            final KeyValue<Bytes, LRUCacheEntry> result = nextEntry;
             nextEntry = null;
             return result;
         }
@@ -288,7 +289,7 @@ public class ThreadCache {
                 return;
             }
 
-            nextEntry = new KeyValue<>(cacheKey.get(), entry);
+            nextEntry = new KeyValue<>(cacheKey, entry);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
new file mode 100644
index 0000000..093161e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.List;
+
+class WindowStoreKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+    private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
+        @Override
+        public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+            return iterator.hasNext();
+        }
+    };
+    private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        return Bytes.wrap(WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes));
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        return Bytes.wrap(WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes));
+    }
+
+    @Override
+    public long segmentTimestamp(final Bytes key) {
+        return WindowStoreUtils.timestampFromBinaryKey(key.get());
+    }
+
+    @Override
+    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+        return ITERATOR_HAS_NEXT;
+    }
+
+    @Override
+    public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
+        return segments.segments(from, to);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
new file mode 100644
index 0000000..9c93dcc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+public interface WindowStoreSupplier<T extends StateStore> extends StateStoreSupplier<T> {
+    long retentionPeriod();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 0f70588..b443abc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -165,7 +165,7 @@ public class JoinIntegrationTest {
 
     private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
         if (expectedResult != null) {
-            final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), Long.MAX_VALUE);
+            final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
             assertThat(result, is(expectedResult));
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index bdbc1c8..cac8384 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -29,14 +29,20 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -48,12 +54,18 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
 
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -68,6 +80,7 @@ public class KStreamAggregationIntegrationTest {
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
     private String streamOneInput;
+    private String userSessionsStream = "user-sessions";
     private String outputTopic;
     private KGroupedStream<String, String> groupedStream;
     private Reducer<String> reducer;
@@ -416,6 +429,194 @@ public class KStreamAggregationIntegrationTest {
 
     }
 
+    @Test
+    public void shouldCountSessionWindows() throws Exception {
+        final long sessionGap = 5 * 60 * 1000L;
+        final long maintainMillis = sessionGap * 3;
+
+        final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
+                                                                        new KeyValue<>("penny", "start"),
+                                                                        new KeyValue<>("jo", "pause"),
+                                                                        new KeyValue<>("emily", "pause"));
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                t1Messages,
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t1);
+
+        final long t2 = t1 + (sessionGap / 2);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                Collections.singletonList(
+                        new KeyValue<>("emily", "resume")
+                ),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t2);
+        final long t3 = t1 + sessionGap + 1;
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                Arrays.asList(
+                        new KeyValue<>("bob", "pause"),
+                        new KeyValue<>("penny", "stop")
+                ),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t3);
+
+        final long t4 = t3 + (sessionGap / 2);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                Arrays.asList(
+                        new KeyValue<>("bob", "resume"), // bobs session continues
+                        new KeyValue<>("jo", "resume")   // jo's starts new session
+                ),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t4);
+
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(11);
+        builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
+                .groupByKey(Serdes.String(), Serdes.String())
+                .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.put(key, value);
+                        latch.countDown();
+                    }
+                });
+
+        startStreams();
+        latch.await(30, TimeUnit.SECONDS);
+        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo(1L));
+    }
+
+    @Test
+    public void shouldReduceSessionWindows() throws Exception {
+        final long sessionGap = 1000L; // something to do with time
+        final long maintainMillis = sessionGap * 3;
+
+        final long t1 = mockTime.milliseconds();
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
+                                                                        new KeyValue<>("penny", "start"),
+                                                                        new KeyValue<>("jo", "pause"),
+                                                                        new KeyValue<>("emily", "pause"));
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                t1Messages,
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t1);
+
+        final long t2 = t1 + (sessionGap / 2);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                Collections.singletonList(
+                        new KeyValue<>("emily", "resume")
+                ),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t2);
+        final long t3 = t1 + sessionGap + 1;
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                Arrays.asList(
+                        new KeyValue<>("bob", "pause"),
+                        new KeyValue<>("penny", "stop")
+                ),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t3);
+
+        final long t4 = t3 + (sessionGap / 2);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream,
+                Arrays.asList(
+                        new KeyValue<>("bob", "resume"), // bobs session continues
+                        new KeyValue<>("jo", "resume")   // jo's starts new session
+                ),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                t4);
+
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(11);
+        final String userSessionsStore = "UserSessionsStore";
+        builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
+                .groupByKey(Serdes.String(), Serdes.String())
+                .reduce(new Reducer<String>() {
+                    @Override
+                    public String apply(final String value1, final String value2) {
+                        return value1 + ":" + value2;
+                    }
+                }, SessionWindows.with(sessionGap).until(maintainMillis), userSessionsStore)
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                        latch.countDown();
+                    }
+                });
+
+        startStreams();
+        latch.await(30, TimeUnit.SECONDS);
+        final ReadOnlySessionStore<String, String> sessionStore
+                = kafkaStreams.store(userSessionsStore, QueryableStoreTypes.<String, String>sessionStore());
+
+        // verify correct data received
+        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo("start"));
+        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo("start"));
+        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo("pause"));
+        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo("resume"));
+        assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo("pause:resume"));
+        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo("pause:resume"));
+        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo("stop"));
+
+        // verify can query data via IQ
+        final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob");
+        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t1, t1)), "start")));
+        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t3, t4)), "pause:resume")));
+        assertFalse(bob.hasNext());
+
+    }
+
 
     private void produceMessages(final long timestamp)
         throws ExecutionException, InterruptedException {
@@ -439,7 +640,9 @@ public class KStreamAggregationIntegrationTest {
     private void createTopics() {
         streamOneInput = "stream-one-" + testNo;
         outputTopic = "output-" + testNo;
+        userSessionsStream = userSessionsStream + "-" + testNo;
         CLUSTER.createTopic(streamOneInput, 3, 1);
+        CLUSTER.createTopic(userSessionsStream);
         CLUSTER.createTopic(outputTopic);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 292f229..82e8c6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -18,25 +18,43 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
 public class KGroupedStreamImplTest {
 
+    private static final String TOPIC = "topic";
+    private final KStreamBuilder builder = new KStreamBuilder();
     private KGroupedStream<String, String> groupedStream;
 
     @Before
     public void before() {
-        final KStream<String, String> stream = new KStreamBuilder().stream("topic");
-        groupedStream = stream.groupByKey();
+        final KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), TOPIC);
+        groupedStream = stream.groupByKey(Serdes.String(), Serdes.String());
     }
 
     @Test(expected = NullPointerException.class)
@@ -63,7 +81,7 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
-        groupedStream.reduce(MockReducer.STRING_ADDER, null, "store");
+        groupedStream.reduce(MockReducer.STRING_ADDER, (Windows) null, "store");
     }
 
     @Test(expected = NullPointerException.class)
@@ -114,4 +132,209 @@ public class KGroupedStreamImplTest {
         StateStoreSupplier storeSupplier = null;
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), storeSupplier);
     }
+
+    @Test
+    public void shouldAggregateSessionWindows() throws Exception {
+        final Map<Windowed<String>, Integer> results = new HashMap<>();
+        groupedStream.aggregate(new Initializer<Integer>() {
+            @Override
+            public Integer apply() {
+                return 0;
+            }
+        }, new Aggregator<String, String, Integer>() {
+            @Override
+            public Integer apply(final String aggKey, final String value, final Integer aggregate) {
+                return aggregate + 1;
+            }
+        }, new Merger<String, Integer>() {
+            @Override
+            public Integer apply(final String aggKey, final Integer aggOne, final Integer aggTwo) {
+                return aggOne + aggTwo;
+            }
+        }, SessionWindows.with(30), Serdes.Integer(), "session-store")
+                .foreach(new ForeachAction<Windowed<String>, Integer>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Integer value) {
+                        results.put(key, value);
+                    }
+                });
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "2", "2");
+        driver.setTime(30);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(70);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(90);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(100);
+        driver.process(TOPIC, "1", "1");
+        driver.flushState();
+        assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30))));
+        assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15))));
+        assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100))));
+    }
+
+    @Test
+    public void shouldCountSessionWindows() throws Exception {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        groupedStream.count(SessionWindows.with(30), "session-store")
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value) {
+                        results.put(key, value);
+                    }
+                });
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "2", "2");
+        driver.setTime(30);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(70);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(90);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(100);
+        driver.process(TOPIC, "1", "1");
+        driver.flushState();
+        assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30))));
+        assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15))));
+        assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100))));
+    }
+
+    @Test
+    public void shouldReduceSessionWindows() throws Exception {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        groupedStream.reduce(
+                new Reducer<String>() {
+                    @Override
+                    public String apply(final String value1, final String value2) {
+                        return value1 + ":" + value2;
+                    }
+                }, SessionWindows.with(30),
+                "session-store")
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "A");
+        driver.setTime(15);
+        driver.process(TOPIC, "2", "Z");
+        driver.setTime(30);
+        driver.process(TOPIC, "1", "B");
+        driver.setTime(70);
+        driver.process(TOPIC, "1", "A");
+        driver.setTime(90);
+        driver.process(TOPIC, "1", "B");
+        driver.setTime(100);
+        driver.process(TOPIC, "1", "C");
+        driver.flushState();
+        assertEquals("A:B", results.get(new Windowed<>("1", new TimeWindow(10, 30))));
+        assertEquals("Z", results.get(new Windowed<>("2", new TimeWindow(15, 15))));
+        assertEquals("A:B:C", results.get(new Windowed<>("1", new TimeWindow(70, 100))));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() throws Exception {
+        groupedStream.reduce(null, SessionWindows.with(10), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows) null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullStateStoreSupplierNameWhenReducingSessionWindows() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
+        groupedStream.aggregate(null, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+            @Override
+            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+                return null;
+            }
+        }, SessionWindows.with(10), Serdes.String(), "storeName");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, null, new Merger<String, String>() {
+            @Override
+            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+                return null;
+            }
+        }, SessionWindows.with(10), Serdes.String(), "storeName");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT,
+                                MockAggregator.STRING_ADDER,
+                                null,
+                                SessionWindows.with(10),
+                                Serdes.String(),
+                                "storeName");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+            @Override
+            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+                return null;
+            }
+        }, null, Serdes.String(), "storeName");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+            @Override
+            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+                return null;
+            }
+        }, SessionWindows.with(10), Serdes.String(), (String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+            @Override
+            public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+                return null;
+            }
+        }, SessionWindows.with(10), Serdes.String(), (StateStoreSupplier<SessionStore>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() throws Exception {
+        groupedStream.count((SessionWindows) null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
+        groupedStream.count(SessionWindows.with(90), (String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws Exception {
+        groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
+    }
 }
\ No newline at end of file


Mime
View raw message