kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/5] kafka git commit: KAFKA-4490: Add Global Table support to Kafka Streams
Date Thu, 12 Jan 2017 19:46:08 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
new file mode 100644
index 0000000..927f62b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.CHECKPOINT_FILE_NAME;
+
+/**
+ * This class is responsible for the initialization, restoration, closing, flushing etc
+ * of Global State Stores. There is only ever 1 instance of this class per Application Instance.
+ */
+public class GlobalStateManagerImpl implements GlobalStateManager {
+    private static final int MAX_LOCK_ATTEMPTS = 5;
+    private static final Logger log = LoggerFactory.getLogger(GlobalStateManagerImpl.class);
+
+    private final ProcessorTopology topology;
+    private final Consumer<byte[], byte[]> consumer;
+    private final StateDirectory stateDirectory;
+    private final Map<String, StateStore> stores = new LinkedHashMap<>();
+    private final File baseDir;
+    private final OffsetCheckpoint checkpoint;
+    private final Set<String> globalStoreNames = new HashSet<>();
+    private HashMap<TopicPartition, Long> checkpointableOffsets;
+
+    public GlobalStateManagerImpl(final ProcessorTopology topology,
+                           final Consumer<byte[], byte[]> consumer,
+                           final StateDirectory stateDirectory) {
+        this.topology = topology;
+        this.consumer = consumer;
+        this.stateDirectory = stateDirectory;
+        this.baseDir = stateDirectory.globalStateDir();
+        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+    }
+
+    @Override
+    public Set<String> initialize(final InternalProcessorContext processorContext) {
+        try {
+            if (!stateDirectory.lockGlobalState(MAX_LOCK_ATTEMPTS)) {
+                throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
+            }
+        } catch (IOException e) {
+            throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
+        }
+
+        try {
+            this.checkpointableOffsets = new HashMap<>(checkpoint.read());
+            checkpoint.delete();
+        } catch (IOException e) {
+            try {
+                stateDirectory.unlockGlobalState();
+            } catch (IOException e1) {
+                log.error("failed to unlock the global state directory", e);
+            }
+            throw new StreamsException("Failed to read checkpoints for global state stores", e);
+        }
+
+        final List<StateStore> stateStores = topology.globalStateStores();
+        for (final StateStore stateStore : stateStores) {
+            globalStoreNames.add(stateStore.name());
+            stateStore.init(processorContext, stateStore);
+        }
+        return Collections.unmodifiableSet(globalStoreNames);
+    }
+
+    @Override
+    public StateStore getGlobalStore(final String name) {
+        return stores.get(name);
+    }
+
+    @Override
+    public StateStore getStore(final String name) {
+        return getGlobalStore(name);
+    }
+
+    public File baseDir() {
+        return baseDir;
+    }
+
+    public void register(final StateStore store,
+                         final boolean ignored,
+                         final StateRestoreCallback stateRestoreCallback) {
+
+        if (stores.containsKey(store.name())) {
+            throw new IllegalArgumentException(String.format("Global Store %s has already been registered", store.name()));
+        }
+
+        if (!globalStoreNames.contains(store.name())) {
+            throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
+        }
+
+        if (stateRestoreCallback == null) {
+            throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", store.name()));
+        }
+
+        log.info("restoring state for global store {}", store.name());
+        final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
+        consumer.assign(topicPartitions);
+        final Map<TopicPartition, Long> highWatermarks = consumer.endOffsets(topicPartitions);
+        try {
+            restoreState(stateRestoreCallback, topicPartitions, highWatermarks);
+            stores.put(store.name(), store);
+        } finally {
+            consumer.assign(Collections.<TopicPartition>emptyList());
+        }
+
+    }
+
+    private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
+        final String sourceTopic = topology.sourceStoreToSourceTopic().get(store.name());
+        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(sourceTopic);
+        if (partitionInfos == null || partitionInfos.isEmpty()) {
+            throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
+        }
+
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (PartitionInfo partition : partitionInfos) {
+            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        }
+        return topicPartitions;
+    }
+
+    private void restoreState(final StateRestoreCallback stateRestoreCallback,
+                              final List<TopicPartition> topicPartitions,
+                              final Map<TopicPartition, Long> highWatermarks) {
+        for (final TopicPartition topicPartition : topicPartitions) {
+            consumer.assign(Collections.singletonList(topicPartition));
+            final Long checkpoint = checkpointableOffsets.get(topicPartition);
+            if (checkpoint != null) {
+                consumer.seek(topicPartition, checkpoint);
+            } else {
+                consumer.seekToBeginning(Collections.singletonList(topicPartition));
+            }
+
+            long offset = consumer.position(topicPartition);
+            final Long highWatermark = highWatermarks.get(topicPartition);
+
+            while (offset < highWatermark) {
+                final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    offset = record.offset() + 1;
+                    stateRestoreCallback.restore(record.key(), record.value());
+                }
+            }
+            checkpointableOffsets.put(topicPartition, offset);
+        }
+    }
+
+    public void flush(final InternalProcessorContext context) {
+        log.debug("Flushing all global stores registered in the state manager");
+        for (StateStore store : this.stores.values()) {
+            try {
+                log.trace("Flushing global store={}", store.name());
+                store.flush();
+            } catch (Exception e) {
+                throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
+            }
+        }
+    }
+
+
+    @Override
+    public void close(final Map<TopicPartition, Long> offsets) throws IOException {
+        try {
+            if (stores.isEmpty()) {
+                return;
+            }
+            final StringBuilder closeFailed = new StringBuilder();
+            for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
+                log.debug("Closing global storage engine {}", entry.getKey());
+                try {
+                    entry.getValue().close();
+                } catch (Exception e) {
+                    log.error("Failed to close global state store {}", entry.getKey(), e);
+                    closeFailed.append("Failed to close global state store:")
+                            .append(entry.getKey())
+                            .append(". Reason: ")
+                            .append(e.getMessage())
+                            .append("\n");
+                }
+            }
+            stores.clear();
+            if (closeFailed.length() > 0) {
+                throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed);
+            }
+            writeCheckpoints(offsets);
+        } finally {
+            stateDirectory.unlockGlobalState();
+        }
+    }
+
+    private void writeCheckpoints(final Map<TopicPartition, Long> offsets) {
+        if (!offsets.isEmpty()) {
+            checkpointableOffsets.putAll(offsets);
+            try {
+                checkpoint.write(checkpointableOffsets);
+            } catch (IOException e) {
+                log.warn("failed to write offsets checkpoint for global stores", e);
+            }
+        }
+    }
+
+    @Override
+    public Map<TopicPartition, Long> checkpointedOffsets() {
+        return Collections.unmodifiableMap(checkpointableOffsets);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
new file mode 100644
index 0000000..9723f3c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Updates the state for all Global State Stores.
+ */
+public class GlobalStateUpdateTask implements GlobalStateMaintainer {
+
+    private class SourceNodeAndDeserializer {
+        private final SourceNode sourceNode;
+        private final RecordDeserializer deserializer;
+
+        SourceNodeAndDeserializer(final SourceNode sourceNode,
+                                  final RecordDeserializer deserializer) {
+            this.sourceNode = sourceNode;
+            this.deserializer = deserializer;
+        }
+    }
+
+    private final ProcessorTopology topology;
+    private final InternalProcessorContext processorContext;
+    private final Map<TopicPartition, Long> offsets = new HashMap<>();
+    private final Map<String, SourceNodeAndDeserializer> deserializers = new HashMap<>();
+    private final GlobalStateManager stateMgr;
+
+
+    public GlobalStateUpdateTask(final ProcessorTopology topology,
+                                 final InternalProcessorContext processorContext,
+                                 final GlobalStateManager stateMgr) {
+
+        this.topology = topology;
+        this.stateMgr = stateMgr;
+        this.processorContext = processorContext;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<TopicPartition, Long> initialize() {
+        final Set<String> storeNames = stateMgr.initialize(processorContext);
+        final Map<String, String> storeNameToTopic = topology.sourceStoreToSourceTopic();
+        for (final String storeName : storeNames) {
+            final String sourceTopic = storeNameToTopic.get(storeName);
+            final SourceNode source = topology.source(sourceTopic);
+            deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source)));
+        }
+        initTopology();
+        processorContext.initialized();
+        return stateMgr.checkpointedOffsets();
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void update(final ConsumerRecord<byte[], byte[]> record) {
+        final SourceNodeAndDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
+        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserializer.deserialize(record);
+        final ProcessorRecordContext recordContext =
+                new ProcessorRecordContext(deserialized.timestamp(),
+                                           deserialized.offset(),
+                                           deserialized.partition(),
+                                           deserialized.topic());
+        processorContext.setRecordContext(recordContext);
+        processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode);
+        sourceNodeAndDeserializer.sourceNode.process(deserialized.key(), deserialized.value());
+        offsets.put(new TopicPartition(record.topic(), record.partition()), deserialized.offset() + 1);
+    }
+
+    public void flushState() {
+        stateMgr.flush(processorContext);
+    }
+
+    public void close() throws IOException {
+        stateMgr.close(offsets);
+    }
+
+    private void initTopology() {
+        for (ProcessorNode node : this.topology.processors()) {
+            processorContext.setCurrentNode(node);
+            try {
+                node.init(this.processorContext);
+            } finally {
+                processorContext.setCurrentNode(null);
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
new file mode 100644
index 0000000..b4e15f2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This is the thread responsible for keeping all Global State Stores updated.
+ * It delegates most of the responsibility to the internal class StateConsumer
+ */
+public class GlobalStreamThread extends Thread {
+
+    private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
+    private final StreamsConfig config;
+    private final Consumer<byte[], byte[]> consumer;
+    private final StateDirectory stateDirectory;
+    private final Time time;
+    private final ThreadCache cache;
+    private final StreamsMetrics streamsMetrics;
+    private final ProcessorTopology topology;
+    private volatile boolean running = false;
+    private volatile StreamsException startupException;
+
+    public GlobalStreamThread(final ProcessorTopology topology,
+                              final StreamsConfig config,
+                              final Consumer<byte[], byte[]> globalConsumer,
+                              final StateDirectory stateDirectory,
+                              final Metrics metrics,
+                              final Time time,
+                              final String clientId
+    ) {
+        super("GlobalStreamThread");
+        this.topology = topology;
+        this.config = config;
+        this.consumer = globalConsumer;
+        this.stateDirectory = stateDirectory;
+        this.time = time;
+        long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+                (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
+        final String threadClientId = clientId + "-" + getName();
+        this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
+        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
+    }
+
+    static class StateConsumer {
+        private final Consumer<byte[], byte[]> consumer;
+        private final GlobalStateMaintainer stateMaintainer;
+        private final Time time;
+        private final long pollMs;
+        private final long flushInterval;
+
+        private long lastFlush;
+
+        StateConsumer(final Consumer<byte[], byte[]> consumer,
+                      final GlobalStateMaintainer stateMaintainer,
+                      final Time time,
+                      final long pollMs,
+                      final long flushInterval) {
+            this.consumer = consumer;
+            this.stateMaintainer = stateMaintainer;
+            this.time = time;
+            this.pollMs = pollMs;
+            this.flushInterval = flushInterval;
+        }
+
+        void initialize() {
+            final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize();
+            consumer.assign(partitionOffsets.keySet());
+            for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+                consumer.seek(entry.getKey(), entry.getValue());
+            }
+            lastFlush = time.milliseconds();
+        }
+
+        void pollAndUpdate() {
+            final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs);
+            for (ConsumerRecord<byte[], byte[]> record : received) {
+                stateMaintainer.update(record);
+            }
+            final long now = time.milliseconds();
+            if (flushInterval >= 0 && now >= lastFlush + flushInterval) {
+                stateMaintainer.flushState();
+                lastFlush = now;
+            }
+        }
+
+        public void close() throws IOException {
+            // just log an error if the consumer throws an exception during close
+            // so we can always attempt to close the state stores.
+            try {
+                consumer.close();
+            } catch (Exception e) {
+                log.error("Failed to cleanly close GlobalStreamThread consumer", e);
+            }
+
+            stateMaintainer.close();
+
+        }
+    }
+
+    @Override
+    public void run() {
+        final StateConsumer stateConsumer = initialize();
+        if (stateConsumer == null) {
+            return;
+        }
+
+        try {
+            while (running) {
+                stateConsumer.pollAndUpdate();
+            }
+            log.debug("Shutting down GlobalStreamThread at user request");
+        } finally {
+            try {
+                stateConsumer.close();
+            } catch (IOException e) {
+                log.error("Failed to cleanly shutdown GlobalStreamThread", e);
+            }
+        }
+    }
+
+    private StateConsumer initialize() {
+        try {
+            final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
+            final StateConsumer stateConsumer
+                    = new StateConsumer(consumer,
+                                        new GlobalStateUpdateTask(topology,
+                                                                  new GlobalProcessorContextImpl(
+                                                                          config,
+                                                                          stateMgr,
+                                                                          streamsMetrics,
+                                                                          cache),
+                                                                  stateMgr),
+                                        time,
+                                        config.getLong(StreamsConfig.POLL_MS_CONFIG),
+                                        config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+            stateConsumer.initialize();
+            running = true;
+            return stateConsumer;
+        } catch (StreamsException e) {
+            startupException = e;
+        } catch (Exception e) {
+            startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e);
+        }
+        return null;
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        while (!running) {
+            Utils.sleep(1);
+            if (startupException != null) {
+                throw startupException;
+            }
+        }
+    }
+
+
+    public void close() {
+        running = false;
+    }
+
+    public boolean stillRunning() {
+        return running;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 016964b..c78ed09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -43,8 +43,14 @@ public interface InternalProcessorContext extends ProcessorContext {
     void setCurrentNode(ProcessorNode currentNode);
 
     ProcessorNode currentNode();
+
     /**
      * Get the thread-global cache
      */
     ThreadCache getCache();
+
+    /**
+     * Mark this contex as being initialized
+     */
+    void initialized();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 6496b88..0224cdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -114,7 +114,7 @@ public class PartitionGroup {
         RecordQueue recordQueue = partitionQueues.get(partition);
 
         int oldSize = recordQueue.size();
-        int newSize = recordQueue.addRawRecords(rawRecords, timestampExtractor);
+        int newSize = recordQueue.addRawRecords(rawRecords);
 
         // add this record queue to be considered for processing in the future if it was empty before
         if (oldSize == 0 && newSize > 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 31bb7c6..f761f16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -17,73 +17,34 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
-import java.io.File;
 import java.util.List;
-import java.util.Map;
 
-public class ProcessorContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
+public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
 
-    public static final String NONEXIST_TOPIC = "__null_topic__";
-
-    private final TaskId id;
     private final StreamTask task;
-    private final StreamsMetrics metrics;
     private final RecordCollector collector;
-    private final ProcessorStateManager stateMgr;
-
-    private final StreamsConfig config;
-    private final Serde<?> keySerde;
-    private final Serde<?> valSerde;
-    private final ThreadCache cache;
-    private boolean initialized;
-    private RecordContext recordContext;
-    private ProcessorNode currentNode;
 
-    public ProcessorContextImpl(TaskId id,
-                                StreamTask task,
-                                StreamsConfig config,
-                                RecordCollector collector,
-                                ProcessorStateManager stateMgr,
-                                StreamsMetrics metrics,
-                                final ThreadCache cache) {
-        this.id = id;
+    ProcessorContextImpl(final TaskId id,
+                         final StreamTask task,
+                         final StreamsConfig config,
+                         final RecordCollector collector,
+                         final ProcessorStateManager stateMgr,
+                         final StreamsMetrics metrics,
+                         final ThreadCache cache) {
+        super(id, task.applicationId(), config, metrics, stateMgr, cache);
         this.task = task;
-        this.metrics = metrics;
         this.collector = collector;
-        this.stateMgr = stateMgr;
-
-        this.config = config;
-        this.keySerde = config.keySerde();
-        this.valSerde = config.valueSerde();
-        this.cache = cache;
-        this.initialized = false;
-    }
-
-    public void initialized() {
-        this.initialized = true;
     }
 
     public ProcessorStateManager getStateMgr() {
-        return stateMgr;
-    }
-
-    @Override
-    public TaskId taskId() {
-        return id;
-    }
-
-    @Override
-    public String applicationId() {
-        return task.applicationId();
+        return (ProcessorStateManager) stateManager;
     }
 
     @Override
@@ -91,142 +52,66 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
         return this.collector;
     }
 
-    @Override
-    public Serde<?> keySerde() {
-        return this.keySerde;
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return this.valSerde;
-    }
-
-    @Override
-    public File stateDir() {
-        return stateMgr.baseDir();
-    }
-
-    @Override
-    public StreamsMetrics metrics() {
-        return metrics;
-    }
-
-    /**
-     * @throws IllegalStateException if this method is called before {@link #initialized()}
-     */
-    @Override
-    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
-        if (initialized)
-            throw new IllegalStateException("Can only create state stores during initialization.");
-
-        stateMgr.register(store, loggingEnabled, stateRestoreCallback);
-    }
-
     /**
      * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
      */
     @Override
-    public StateStore getStateStore(String name) {
-        if (currentNode == null)
+    public StateStore getStateStore(final String name) {
+        if (currentNode() == null) {
             throw new TopologyBuilderException("Accessing from an unknown node");
-
-        if (!currentNode.stateStores.contains(name)) {
-            throw new TopologyBuilderException("Processor " + currentNode.name() + " has no access to StateStore " + name);
         }
 
-        return stateMgr.getStore(name);
-    }
-
-    @Override
-    public ThreadCache getCache() {
-        return cache;
-    }
-
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
-    @Override
-    public String topic() {
-        if (recordContext == null)
-            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
-
-        String topic = recordContext.topic();
-
-        if (topic.equals(NONEXIST_TOPIC))
-            return null;
-        else
-            return topic;
-    }
-
-    /**
-     * @throws IllegalStateException if partition is null
-     */
-    @Override
-    public int partition() {
-        if (recordContext == null)
-            throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
-
-        return recordContext.partition();
-    }
-
-    /**
-     * @throws IllegalStateException if offset is null
-     */
-    @Override
-    public long offset() {
-        if (recordContext == null)
-            throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
-
-        return recordContext.offset();
-    }
+        final StateStore global = stateManager.getGlobalStore(name);
+        if (global != null) {
+            return global;
+        }
 
-    /**
-     * @throws IllegalStateException if timestamp is null
-     */
-    @Override
-    public long timestamp() {
-        if (recordContext == null)
-            throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
+        if (!currentNode().stateStores.contains(name)) {
+            throw new TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
+        }
 
-        return recordContext.timestamp();
+        return stateManager.getStore(name);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(K key, V value) {
-        ProcessorNode previousNode = currentNode;
+    public <K, V> void forward(final K key, final V value) {
+        final ProcessorNode previousNode = currentNode();
         try {
-            for (ProcessorNode child : (List<ProcessorNode>) currentNode.children()) {
-                currentNode = child;
+            for (ProcessorNode child : (List<ProcessorNode>) currentNode().children()) {
+                setCurrentNode(child);
                 child.process(key, value);
             }
         } finally {
-            currentNode = previousNode;
+            setCurrentNode(previousNode);
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(K key, V value, int childIndex) {
-        ProcessorNode previousNode = currentNode;
-        final ProcessorNode child = (ProcessorNode<K, V>) currentNode.children().get(childIndex);
-        currentNode = child;
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        final ProcessorNode previousNode = currentNode();
+        final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex);
+        setCurrentNode(child);
         try {
             child.process(key, value);
         } finally {
-            currentNode = previousNode;
+            setCurrentNode(previousNode);
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(K key, V value, String childName) {
-        for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
+        for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
             if (child.name().equals(childName)) {
-                ProcessorNode previousNode = currentNode;
-                currentNode = child;
+                ProcessorNode previousNode = currentNode();
+                setCurrentNode(child);
                 try {
                     child.process(key, value);
                     return;
                 } finally {
-                    currentNode = previousNode;
+                    setCurrentNode(previousNode);
                 }
             }
         }
@@ -238,37 +123,8 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
     }
 
     @Override
-    public void schedule(long interval) {
+    public void schedule(final long interval) {
         task.schedule(interval);
     }
 
-    @Override
-    public Map<String, Object> appConfigs() {
-        return config.originals();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(String prefix) {
-        return config.originalsWithPrefix(prefix);
-    }
-
-    @Override
-    public void setRecordContext(final RecordContext recordContext) {
-        this.recordContext = recordContext;
-    }
-
-    @Override
-    public RecordContext recordContext() {
-        return this.recordContext;
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode currentNode) {
-        this.currentNode = currentNode;
-    }
-
-    @Override
-    public ProcessorNode currentNode() {
-        return currentNode;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index f165ebf..b66e3df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -37,7 +37,7 @@ public class ProcessorNode<K, V> {
 
     private final String name;
     private final Processor<K, V> processor;
-    protected NodeMetrics nodeMetrics;
+    NodeMetrics nodeMetrics;
     private Time time;
 
     private K key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index c81df6c..dca8192 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -46,7 +46,7 @@ import java.util.Set;
 
 import static java.util.Collections.singleton;
 
-public class ProcessorStateManager {
+public class ProcessorStateManager implements StateManager {
 
     private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
 
@@ -59,6 +59,7 @@ public class ProcessorStateManager {
     private final Map<String, TopicPartition> partitionForTopic;
     private final File baseDir;
     private final Map<String, StateStore> stores;
+    private final Map<String, StateStore> globalStores;
     private final Set<String> loggingEnabled;
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final Map<TopicPartition, Long> restoredOffsets;
@@ -89,6 +90,7 @@ public class ProcessorStateManager {
             this.partitionForTopic.put(source.topic(), source);
         }
         this.stores = new LinkedHashMap<>();
+        this.globalStores = new HashMap<>();
         this.loggingEnabled = new HashSet<>();
         this.restoreConsumer = restoreConsumer;
         this.restoredOffsets = new HashMap<>();
@@ -321,6 +323,7 @@ public class ProcessorStateManager {
         return stores.get(name);
     }
 
+    @Override
     public void flush(final InternalProcessorContext context) {
         if (!this.stores.isEmpty()) {
             log.debug("{} Flushing all stores registered in the state manager", logPrefix);
@@ -342,6 +345,7 @@ public class ProcessorStateManager {
     /**
      * @throws IOException if any error happens when closing the state stores
      */
+    @Override
     public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
         try {
             // attempting to close the stores, just in case they
@@ -398,4 +402,14 @@ public class ProcessorStateManager {
 
         return partition == null ? defaultPartition : partition.partition();
     }
+
+    void registerGlobalStateStores(final List<StateStore> stateStores) {
+        for (StateStore stateStore : stateStores) {
+            globalStores.put(stateStore.name(), stateStore);
+        }
+    }
+
+    public StateStore getGlobalStore(final String name) {
+        return globalStores.get(name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 9ccc252..1eff351 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -31,19 +31,22 @@ public class ProcessorTopology {
     private final List<StateStore> stateStores;
     private final Map<String, String> sourceStoreToSourceTopic;
     private final Map<StateStore, ProcessorNode> storeToProcessorNodeMap;
-
-    public ProcessorTopology(List<ProcessorNode> processorNodes,
-                             Map<String, SourceNode> sourceByTopics,
-                             Map<String, SinkNode> sinkByTopics,
-                             List<StateStore> stateStores,
-                             Map<String, String> sourceStoreToSourceTopic,
-                             Map<StateStore, ProcessorNode> storeToProcessorNodeMap) {
+    private final List<StateStore> globalStateStores;
+
+    public ProcessorTopology(final List<ProcessorNode> processorNodes,
+                             final Map<String, SourceNode> sourceByTopics,
+                             final Map<String, SinkNode> sinkByTopics,
+                             final List<StateStore> stateStores,
+                             final Map<String, String> sourceStoreToSourceTopic,
+                             final Map<StateStore, ProcessorNode> storeToProcessorNodeMap,
+                             final List<StateStore> globalStateStores) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
         this.sinkByTopics   = Collections.unmodifiableMap(sinkByTopics);
         this.stateStores = Collections.unmodifiableList(stateStores);
         this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
         this.storeToProcessorNodeMap = Collections.unmodifiableMap(storeToProcessorNodeMap);
+        this.globalStateStores = Collections.unmodifiableList(globalStateStores);
     }
 
     public Set<String> sourceTopics() {
@@ -86,6 +89,11 @@ public class ProcessorTopology {
         return storeToProcessorNodeMap;
     }
 
+
+    public List<StateStore> globalStateStores() {
+        return globalStateStores;
+    }
+
     private String childrenToString(String indent, List<ProcessorNode<?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
new file mode 100644
index 0000000..1129a71
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+interface RecordDeserializer {
+    ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index a40b9ff..077a4d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -19,15 +19,12 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 
-import static java.lang.String.format;
 
 /**
  * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
@@ -39,20 +36,26 @@ public class RecordQueue {
     private static final Logger log = LoggerFactory.getLogger(RecordQueue.class);
 
     private final SourceNode source;
+    private final TimestampExtractor timestampExtractor;
     private final TopicPartition partition;
     private final ArrayDeque<StampedRecord> fifoQueue;
     private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
+    private final RecordDeserializer recordDeserializer;
 
     private long partitionTime = TimestampTracker.NOT_KNOWN;
 
-    public RecordQueue(TopicPartition partition, SourceNode source) {
+    RecordQueue(final TopicPartition partition,
+                final SourceNode source,
+                final TimestampExtractor timestampExtractor) {
         this.partition = partition;
         this.source = source;
-
+        this.timestampExtractor = timestampExtractor;
         this.fifoQueue = new ArrayDeque<>();
         this.timeTracker = new MinTimestampTracker<>();
+        this.recordDeserializer = new SourceNodeRecordDeserializer(source);
     }
 
+
     /**
      * Returns the corresponding source node in the topology
      *
@@ -75,36 +78,13 @@ public class RecordQueue {
      * Add a batch of {@link ConsumerRecord} into the queue
      *
      * @param rawRecords the raw records
-     * @param timestampExtractor TimestampExtractor
      * @return the size of this queue
      */
-    public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, TimestampExtractor timestampExtractor) {
+    public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
-            // deserialize the raw record, extract the timestamp and put into the queue
-            final Object key;
-            try {
-                key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
-            } catch (Exception e) {
-                throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d",
-                                                  rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
-            }
-
-            final Object value;
-            try {
-                value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
-            } catch (Exception e) {
-                throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d",
-                                                  rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
-            }
-
-            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
-                                                                         rawRecord.timestamp(), TimestampType.CREATE_TIME,
-                                                                         rawRecord.checksum(),
-                                                                         rawRecord.serializedKeySize(),
-                                                                         rawRecord.serializedValueSize(), key, value);
+            ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
             long timestamp = timestampExtractor.extract(record, timeTracker.get());
-
-            log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record);
+            log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
 
             // drop message if TS is invalid, i.e., negative
             if (timestamp < 0) {
@@ -112,7 +92,6 @@ public class RecordQueue {
             }
 
             StampedRecord stampedRecord = new StampedRecord(record, timestamp);
-
             fifoQueue.addLast(stampedRecord);
             timeTracker.addElement(stampedRecord);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
new file mode 100644
index 0000000..d70af25
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import static java.lang.String.format;
+
+class SourceNodeRecordDeserializer implements RecordDeserializer {
+    private final SourceNode sourceNode;
+
+    SourceNodeRecordDeserializer(final SourceNode sourceNode) {
+        this.sourceNode = sourceNode;
+    }
+
+    @Override
+    public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
+        final Object key;
+        try {
+            key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.key());
+        } catch (Exception e) {
+            throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d",
+                                              rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
+        }
+
+        final Object value;
+        try {
+            value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.value());
+        } catch (Exception e) {
+            throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d",
+                                              rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
+        }
+
+        return new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
+                                    rawRecord.timestamp(), TimestampType.CREATE_TIME,
+                                    rawRecord.checksum(),
+                                    rawRecord.serializedKeySize(),
+                                    rawRecord.serializedValueSize(), key, value);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 86bdc19..954c710 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -19,20 +19,17 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import java.io.File;
 import java.util.Collections;
 import java.util.Map;
 
-public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
+class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
         @Override
@@ -61,51 +58,17 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         }
     };
 
-    private final TaskId id;
-    private final String applicationId;
-    private final StreamsMetrics metrics;
-    private final ProcessorStateManager stateMgr;
-
-    private final StreamsConfig config;
-    private final Serde<?> keySerde;
-    private final Serde<?> valSerde;
-    private final ThreadCache zeroSizedCache;
-
-    private boolean initialized;
-
-    public StandbyContextImpl(TaskId id,
-                              String applicationId,
-                              StreamsConfig config,
-                              ProcessorStateManager stateMgr,
-                              StreamsMetrics metrics) {
-        this.id = id;
-        this.applicationId = applicationId;
-        this.metrics = metrics;
-        this.stateMgr = stateMgr;
-
-        this.config = config;
-        this.keySerde = config.keySerde();
-        this.valSerde = config.valueSerde();
-        this.zeroSizedCache = new ThreadCache("zeroCache", 0, this.metrics);
-        this.initialized = false;
+    public StandbyContextImpl(final TaskId id,
+                       final String applicationId,
+                       final StreamsConfig config,
+                       final ProcessorStateManager stateMgr,
+                       final StreamsMetrics metrics) {
+        super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics));
     }
 
-    public void initialized() {
-        this.initialized = true;
-    }
 
-    public ProcessorStateManager getStateMgr() {
-        return stateMgr;
-    }
-
-    @Override
-    public TaskId taskId() {
-        return id;
-    }
-
-    @Override
-    public String applicationId() {
-        return applicationId;
+    StateManager getStateMgr() {
+        return stateManager;
     }
 
     @Override
@@ -113,37 +76,6 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         return NO_OP_COLLECTOR;
     }
 
-    @Override
-    public Serde<?> keySerde() {
-        return this.keySerde;
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return this.valSerde;
-    }
-
-    @Override
-    public File stateDir() {
-        return stateMgr.baseDir();
-    }
-
-    @Override
-    public StreamsMetrics metrics() {
-        return metrics;
-    }
-
-    /**
-     * @throws IllegalStateException
-     */
-    @Override
-    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
-        if (initialized)
-            throw new IllegalStateException("Can only create state stores during initialization.");
-
-        stateMgr.register(store, loggingEnabled, stateRestoreCallback);
-    }
-
     /**
      * @throws UnsupportedOperationException
      */
@@ -152,11 +84,6 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
     }
 
-    @Override
-    public ThreadCache getCache() {
-        return zeroSizedCache;
-    }
-
     /**
      * @throws UnsupportedOperationException
      */
@@ -229,15 +156,6 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
 
-    @Override
-    public Map<String, Object> appConfigs() {
-        return config.originals();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(String prefix) {
-        return config.originalsWithPrefix(prefix);
-    }
 
     @Override
     public RecordContext recordContext() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index a48ec59..d264b26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -47,6 +47,9 @@ public class StateDirectory {
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, FileLock> locks = new HashMap<>();
 
+    private FileChannel globalStateChannel;
+    private FileLock globalStateLock;
+
     public StateDirectory(final String applicationId, final String stateDirConfig) {
         final File baseDir = new File(stateDirConfig);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
@@ -75,6 +78,15 @@ public class StateDirectory {
         return taskDir;
     }
 
+    public File globalStateDir() {
+        final File dir = new File(stateDir, "global");
+        if (!dir.exists() && !dir.mkdir()) {
+            throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created",
+                                                            dir.getPath()));
+        }
+        return dir;
+    }
+
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId
@@ -100,6 +112,49 @@ public class StateDirectory {
             return false;
         }
 
+        final FileLock lock = tryLock(retry, channel);
+        if (lock != null) {
+            locks.put(taskId, lock);
+        }
+        return lock != null;
+    }
+
+    public boolean lockGlobalState(final int retry) throws IOException {
+        if (globalStateLock != null) {
+            return true;
+        }
+
+        final File lockFile = new File(globalStateDir(), LOCK_FILE_NAME);
+        final FileChannel channel;
+        try {
+            channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        } catch (NoSuchFileException e) {
+            // FileChannel.open(..) could throw NoSuchFileException when there is another thread
+            // concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock
+            // file, in this case we will return immediately indicating locking failed.
+            return false;
+        }
+        final FileLock fileLock = tryLock(retry, channel);
+        if (fileLock == null) {
+            channel.close();
+            return false;
+        }
+        globalStateChannel = channel;
+        globalStateLock = fileLock;
+        return true;
+    }
+
+    public void unlockGlobalState() throws IOException {
+        if (globalStateLock == null) {
+            return;
+        }
+        globalStateLock.release();
+        globalStateChannel.close();
+        globalStateLock = null;
+        globalStateChannel = null;
+    }
+
+    private FileLock tryLock(int retry, final FileChannel channel) throws IOException {
         FileLock lock = tryAcquireLock(channel);
         while (lock == null && retry > 0) {
             try {
@@ -110,12 +165,11 @@ public class StateDirectory {
             retry--;
             lock = tryAcquireLock(channel);
         }
-        if (lock != null) {
-            locks.put(taskId, lock);
-        }
-        return lock != null;
+        return lock;
     }
 
+
+
     /**
      * Unlock the state directory for the given {@link TaskId}
      * @param taskId
@@ -196,4 +250,7 @@ public class StateDirectory {
             return null;
         }
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
new file mode 100644
index 0000000..7343c85
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+interface StateManager {
+    File baseDir();
+
+    void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
+
+    void flush(InternalProcessorContext context);
+
+    void close(Map<TopicPartition, Long> offsets) throws IOException;
+
+    StateStore getGlobalStore(final String name);
+
+    StateStore getStore(final String name);
+
+    Map<TopicPartition, Long> checkpointedOffsets();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 07ae761..b714221 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -114,15 +114,17 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // to corresponding source nodes in the processor topology
         partitionQueues = new HashMap<>();
 
+        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+
         for (TopicPartition partition : partitions) {
             SourceNode source = topology.source(partition.topic());
-            RecordQueue queue = createRecordQueue(partition, source);
+            RecordQueue queue = createRecordQueue(partition, source, timestampExtractor);
             partitionQueues.put(partition, queue);
         }
 
         this.logPrefix = String.format("task [%s]", id);
 
-        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+
         this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
         // initialize the consumed offset cache
@@ -137,8 +139,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // initialize the state stores
         log.info("{} Initializing state stores", logPrefix);
         initializeStateStores();
+        stateMgr.registerGlobalStateStores(topology.globalStateStores());
         initTopology();
-        ((ProcessorContextImpl) this.processorContext).initialized();
+        this.processorContext.initialized();
     }
 
     /**
@@ -379,8 +382,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return recordCollector.offsets();
     }
 
-    private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
-        return new RecordQueue(partition, source);
+    @SuppressWarnings("unchecked")
+    private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source, final TimestampExtractor timestampExtractor) {
+        return new RecordQueue(partition, source, timestampExtractor);
     }
 
     private ProcessorRecordContext createRecordContext(final StampedRecord currRecord) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5641849..38961f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -202,7 +202,6 @@ public class StreamThread extends Thread {
     private final StreamsMetricsThreadImpl streamsMetrics;
     final StateDirectory stateDirectory;
     private String originalReset;
-
     private StreamPartitionAssignor partitionAssignor = null;
     private boolean cleanRun = false;
     private long timerStartedMs;
@@ -285,7 +284,8 @@ public class StreamThread extends Thread {
                         UUID processId,
                         Metrics metrics,
                         Time time,
-                        StreamsMetadataState streamsMetadataState) {
+                        StreamsMetadataState streamsMetadataState,
+                        final long cacheSizeBytes) {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
         this.applicationId = applicationId;
         String threadName = getName();
@@ -303,8 +303,6 @@ public class StreamThread extends Thread {
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName);
         }
-        long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
-            config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
 
 
@@ -425,6 +423,8 @@ public class StreamThread extends Thread {
         removeStreamTasks();
         removeStandbyTasks();
 
+        // clean up global tasks
+
         log.info("{} Stream thread shutdown complete", logPrefix);
         setState(State.NOT_RUNNING);
         streamsMetrics.removeAllSensors();
@@ -1108,7 +1108,7 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * This class extends {@link #StreamsMetricsImpl(Metrics, String, String, Map)} and
+     * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
      * overrides one of its functions for efficiency
      */
     private class StreamsMetricsThreadImpl extends StreamsMetricsImpl {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 5494674..a59eb5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -43,12 +43,18 @@ import java.util.Set;
  * in a KafkaStreams application
  */
 public class StreamsMetadataState {
+    public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
     private final TopologyBuilder builder;
     private final List<StreamsMetadata> allMetadata = new ArrayList<>();
+    private final Set<String> globalStores;
+    private final HostInfo thisHost;
     private Cluster clusterMetadata;
+    private StreamsMetadata myMetadata;
 
-    public StreamsMetadataState(final TopologyBuilder builder) {
+    public StreamsMetadataState(final TopologyBuilder builder, final HostInfo thisHost) {
         this.builder = builder;
+        this.globalStores = builder.globalStateStores().keySet();
+        this.thisHost = thisHost;
     }
 
     /**
@@ -74,6 +80,10 @@ public class StreamsMetadataState {
             return Collections.emptyList();
         }
 
+        if (globalStores.contains(storeName)) {
+            return allMetadata;
+        }
+
         final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
         if (sourceTopics == null) {
             return Collections.emptyList();
@@ -114,6 +124,15 @@ public class StreamsMetadataState {
             return StreamsMetadata.NOT_AVAILABLE;
         }
 
+        if (globalStores.contains(storeName)) {
+            // global stores are on every node. if we dont' have the host info
+            // for this host then just pick the first metadata
+            if (thisHost == UNKNOWN_HOST) {
+                return allMetadata.get(0);
+            }
+            return myMetadata;
+        }
+
         final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
         if (sourceTopicsInfo == null) {
             return null;
@@ -155,6 +174,15 @@ public class StreamsMetadataState {
             return StreamsMetadata.NOT_AVAILABLE;
         }
 
+        if (globalStores.contains(storeName)) {
+            // global stores are on every node. if we dont' have the host info
+            // for this host then just pick the first metadata
+            if (thisHost == UNKNOWN_HOST) {
+                return allMetadata.get(0);
+            }
+            return myMetadata;
+        }
+
         SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
         if (sourceTopicsInfo == null) {
             return null;
@@ -198,7 +226,12 @@ public class StreamsMetadataState {
                     storesOnHost.add(storeTopicEntry.getKey());
                 }
             }
-            allMetadata.add(new StreamsMetadata(key, storesOnHost, partitionsForHost));
+            storesOnHost.addAll(globalStores);
+            final StreamsMetadata metadata = new StreamsMetadata(key, storesOnHost, partitionsForHost);
+            allMetadata.add(metadata);
+            if (key.equals(thisHost)) {
+                myMetadata = metadata;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 6da40b7..fdb03fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -132,6 +132,9 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
     @Override
     public synchronized V get(final K key) {
         validateStoreOpen();
+        if (key == null) {
+            return null;
+        }
         final byte[] rawKey = serdes.rawKey(key);
         return get(rawKey);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index 61cd950..eb57ace 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -22,13 +22,13 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.NoSuchElementException;
 
-class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
+public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
     private final String storeName;
     private final KeyValueIterator<K, V> underlying;
     private KeyValue<K, V> next;
     private volatile boolean open = true;
 
-    DelegatingPeekingKeyValueIterator(final String storeName, final KeyValueIterator<K, V> underlying) {
+    public DelegatingPeekingKeyValueIterator(final String storeName, final KeyValueIterator<K, V> underlying) {
         this.storeName = storeName;
         this.underlying = underlying;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
new file mode 100644
index 0000000..4957c03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class GlobalStateStoreProvider implements StateStoreProvider {
+    private final Map<String, StateStore> globalStateStores;
+
+    public GlobalStateStoreProvider(Map<String, StateStore> globalStateStores) {
+        this.globalStateStores = globalStateStores;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+        final StateStore store = globalStateStores.get(storeName);
+        if (store == null || !queryableStoreType.accepts(store)) {
+            return Collections.emptyList();
+        }
+        if (!store.isOpen()) {
+            throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
+        }
+        return (List<T>) Collections.singletonList(store);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 849caa7..6dc08f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -83,6 +83,10 @@ class NamedCache {
     }
 
     synchronized LRUCacheEntry get(final Bytes key) {
+        if (key == null) {
+            return null;
+        }
+
         final LRUNode node = getInternal(key);
         if (node == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index ff17e68..7d10055 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -55,10 +55,7 @@ public class OffsetCheckpoint {
     private final File file;
     private final Object lock;
 
-    /**
-     * @throws IOException
-     */
-    public OffsetCheckpoint(File file) throws IOException {
+    public OffsetCheckpoint(File file) {
         this.file = file;
         this.lock = new Object();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 64dac1f..419fc28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -19,6 +19,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -27,20 +28,28 @@ import java.util.List;
 public class QueryableStoreProvider {
 
     private final List<StateStoreProvider> storeProviders;
+    private final GlobalStateStoreProvider globalStoreProvider;
 
-    public QueryableStoreProvider(final List<StateStoreProvider> storeProviders) {
+    public QueryableStoreProvider(final List<StateStoreProvider> storeProviders,
+                                  final GlobalStateStoreProvider globalStateStoreProvider) {
         this.storeProviders = new ArrayList<>(storeProviders);
+        this.globalStoreProvider = globalStateStoreProvider;
     }
 
     /**
      * Get a composite object wrapping the instances of the {@link StateStore} with the provided
      * storeName and {@link QueryableStoreType}
-     * @param storeName             name of the store
-     * @param queryableStoreType    accept stores passing {@link QueryableStoreType#accepts(StateStore)}
-     * @param <T>                   The expected type of the returned store
+     *
+     * @param storeName          name of the store
+     * @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)}
+     * @param <T>                The expected type of the returned store
      * @return A composite object that wraps the store instances.
      */
     public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+        final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
+        if (!globalStore.isEmpty()) {
+            return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
+        }
         final List<T> allStores = new ArrayList<>();
         for (StateStoreProvider storeProvider : storeProviders) {
             allStores.addAll(storeProvider.stores(storeName, queryableStoreType));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 6f33a63..f8f0e08 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -104,6 +104,10 @@ public class ThreadCache {
     public LRUCacheEntry get(final String namespace, byte[] key) {
         numGets++;
 
+        if (key == null) {
+            return null;
+        }
+
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
             return null;


Mime
View raw message