kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/5] kafka git commit: KAFKA-4490: Add Global Table support to Kafka Streams
Date Thu, 12 Jan 2017 19:46:06 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
new file mode 100644
index 0000000..d818d36
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class StateConsumerTest {
+
+    private static final long FLUSH_INTERVAL = 1000L;
+    private final TopicPartition topicOne = new TopicPartition("topic-one", 1);
+    private final TopicPartition topicTwo = new TopicPartition("topic-two", 1);
+    private final MockTime time = new MockTime();
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final Map<TopicPartition, Long> partitionOffsets = new HashMap<>();
+    private GlobalStreamThread.StateConsumer stateConsumer;
+    private StateMaintainerStub stateMaintainer;
+
+    @Before
+    public void setUp() throws Exception {
+        partitionOffsets.put(topicOne, 20L);
+        partitionOffsets.put(topicTwo, 30L);
+        stateMaintainer = new StateMaintainerStub(partitionOffsets);
+        stateConsumer = new GlobalStreamThread.StateConsumer(consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL);
+    }
+
+    @Test
+    public void shouldAssignPartitionsToConsumer() throws Exception {
+        stateConsumer.initialize();
+        assertEquals(Utils.mkSet(topicOne, topicTwo), consumer.assignment());
+    }
+
+    @Test
+    public void shouldSeekToInitialOffsets() throws Exception {
+        stateConsumer.initialize();
+        assertEquals(20L, consumer.position(topicOne));
+        assertEquals(30L, consumer.position(topicTwo));
+    }
+
+    @Test
+    public void shouldUpdateStateWithReceivedRecordsForPartition() throws Exception {
+        stateConsumer.initialize();
+        consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
+        consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 21L, new byte[0], new byte[0]));
+        stateConsumer.pollAndUpdate();
+        assertEquals(2, stateMaintainer.updatedPartitions.get(topicOne).intValue());
+    }
+
+    @Test
+    public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() throws Exception {
+        stateConsumer.initialize();
+        consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
+        consumer.addRecord(new ConsumerRecord<>("topic-two", 1, 31L, new byte[0], new byte[0]));
+        consumer.addRecord(new ConsumerRecord<>("topic-two", 1, 32L, new byte[0], new byte[0]));
+        stateConsumer.pollAndUpdate();
+        assertEquals(1, stateMaintainer.updatedPartitions.get(topicOne).intValue());
+        assertEquals(2, stateMaintainer.updatedPartitions.get(topicTwo).intValue());
+    }
+
+    @Test
+    public void shouldFlushStoreWhenFlushIntervalHasLapsed() throws Exception {
+        stateConsumer.initialize();
+        consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
+        time.sleep(FLUSH_INTERVAL);
+
+        stateConsumer.pollAndUpdate();
+        assertTrue(stateMaintainer.flushed);
+    }
+
+    @Test
+    public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() throws Exception {
+        stateConsumer.initialize();
+        consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
+        time.sleep(FLUSH_INTERVAL / 2);
+        stateConsumer.pollAndUpdate();
+        assertFalse(stateMaintainer.flushed);
+    }
+
+    @Test
+    public void shouldNotFlushWhenFlushIntervalIsZero() throws Exception {
+        stateConsumer = new GlobalStreamThread.StateConsumer(consumer, stateMaintainer, time, 10L, -1);
+        stateConsumer.initialize();
+        time.sleep(100);
+        stateConsumer.pollAndUpdate();
+        assertFalse(stateMaintainer.flushed);
+    }
+
+    @Test
+    public void shouldCloseConsumer() throws Exception {
+        stateConsumer.close();
+        assertTrue(consumer.closed());
+    }
+
+    @Test
+    public void shouldCloseStateMaintainer() throws Exception {
+        stateConsumer.close();
+        assertTrue(stateMaintainer.closed);
+    }
+
+
+    private static class StateMaintainerStub implements GlobalStateMaintainer {
+        private final Map<TopicPartition, Long> partitionOffsets;
+        private final Map<TopicPartition, Integer> updatedPartitions = new HashMap<>();
+        private boolean flushed;
+        private boolean closed;
+
+        public StateMaintainerStub(final Map<TopicPartition, Long> partitionOffsets) {
+            this.partitionOffsets = partitionOffsets;
+        }
+
+        @Override
+        public Map<TopicPartition, Long> initialize() {
+            return partitionOffsets;
+        }
+
+        public void flushState() {
+            flushed = true;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+
+        @Override
+        public void update(final ConsumerRecord<byte[], byte[]> record) {
+            final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+            if (!updatedPartitions.containsKey(tp)) {
+                updatedPartitions.put(tp, 0);
+            }
+            updatedPartitions.put(tp, updatedPartitions.get(tp) + 1);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 6fc855c..fb55796 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -176,4 +176,26 @@ public class StateDirectoryTest {
         assertTrue(taskDir.exists());
     }
 
+    @Test(expected = OverlappingFileLockException.class)
+    public void shouldLockGlobalStateDirectory() throws Exception {
+        final FileChannel channel = FileChannel.open(new File(directory.globalStateDir(),
+                                                              StateDirectory.LOCK_FILE_NAME).toPath(),
+                                                     StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        directory.lockGlobalState(1);
+        channel.lock();
+    }
+
+    @Test
+    public void shouldUnlockGlobalStateDirectory() throws Exception {
+        final FileChannel channel = FileChannel.open(new File(directory.globalStateDir(),
+                                                              StateDirectory.LOCK_FILE_NAME).toPath(),
+                                                     StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        directory.lockGlobalState(1);
+
+        directory.unlockGlobalState();
+
+        // should lock without any exceptions
+        channel.lock();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 3224c47..5aa40c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -130,7 +130,8 @@ public class StreamPartitionAssignorTest {
 
         String clientId = "client-id";
         UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder)) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                               0) {
             @Override
             public Set<TaskId> prevTasks() {
                 return prevTasks;
@@ -178,8 +179,11 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
+
         MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                 0);
+
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -245,7 +249,7 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -289,7 +293,7 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -351,8 +355,10 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
+
         MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                 0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -421,9 +427,10 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
 
+        MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                 0);
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -511,8 +518,9 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                 0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -575,7 +583,8 @@ public class StreamPartitionAssignorTest {
         UUID uuid = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                               0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
@@ -616,7 +625,8 @@ public class StreamPartitionAssignorTest {
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                 0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -658,7 +668,8 @@ public class StreamPartitionAssignorTest {
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                 0);
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -694,7 +705,8 @@ public class StreamPartitionAssignorTest {
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                           0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
@@ -723,7 +735,8 @@ public class StreamPartitionAssignorTest {
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                           0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
@@ -758,7 +771,8 @@ public class StreamPartitionAssignorTest {
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
         final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
-                new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+                                                           new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                           0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, clientSupplier.restoreConsumer));
@@ -786,8 +800,8 @@ public class StreamPartitionAssignorTest {
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
         final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
-                new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
-
+                                                           new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                           0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
@@ -881,7 +895,7 @@ public class StreamPartitionAssignorTest {
         final String client = "client1";
 
         final StreamsConfig config = new StreamsConfig(configProps());
-        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
@@ -968,7 +982,7 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index adabde7..4b9f92f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -91,7 +91,8 @@ public class StreamTaskTest {
             Collections.<String, SinkNode>emptyMap(),
             Collections.<StateStore>emptyList(),
             Collections.<String, String>emptyMap(),
-            Collections.<StateStore, ProcessorNode>emptyMap());
+            Collections.<StateStore, ProcessorNode>emptyMap(),
+            Collections.<StateStore>emptyList());
     private File baseDir;
     private StateDirectory stateDirectory;
     private RecordCollectorImpl recordCollector;
@@ -349,11 +350,10 @@ public class StreamTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap());
-
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
-            topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
+                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
 
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory,  new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
         final int offset = 20;
         streamTask.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -400,11 +400,12 @@ public class StreamTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap());
-
+                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
-            topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer,
+                                                     restoreStateConsumer, config, streamsMetrics, stateDirectory,
+                                                     new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
 
         try {
             streamTask.punctuate(punctuator, 1);
@@ -423,7 +424,8 @@ public class StreamTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap());
+                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
         final AtomicBoolean flushed = new AtomicBoolean(false);
         final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 454b4c2..8f03e4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -206,8 +206,9 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
+
         MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder)) {
+        StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
 
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
@@ -351,8 +352,8 @@ public class StreamThreadTest {
         final MockClientSupplier mockClientSupplier = new MockClientSupplier();
         mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
 
-        final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
-        final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
         final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
@@ -432,7 +433,7 @@ public class StreamThreadTest {
 
         Metrics metrics = new Metrics();
         StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-            clientId,  processId, metrics, new MockTime(), new StreamsMetadataState(builder));
+            clientId,  processId, metrics, new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
         String defaultGroupName = "stream-metrics";
         String defaultPrefix = "thread." + thread.threadClientId();
         Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
@@ -490,7 +491,9 @@ public class StreamThreadTest {
             builder.addSource("source1", "topic1");
 
             MockClientSupplier mockClientSupplier = new MockClientSupplier();
-            StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
+            StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                   0) {
+
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -618,7 +621,9 @@ public class StreamThreadTest {
             builder.addSource("source1", "topic1");
 
             MockClientSupplier mockClientSupplier = new MockClientSupplier();
-            StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
+            StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                                   0) {
+
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -691,7 +696,8 @@ public class StreamThreadTest {
         StreamsConfig config = new StreamsConfig(configProps());
         MockClientSupplier clientSupplier = new MockClientSupplier();
         StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                               clientId,  processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+                                               clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                               0);
         assertSame(clientSupplier.producer, thread.producer);
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
@@ -707,7 +713,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps());
         final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+                                               clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         thread.partitionAssignor(new StreamPartitionAssignor() {
             @Override
@@ -730,7 +736,7 @@ public class StreamThreadTest {
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
         final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
@@ -781,7 +787,7 @@ public class StreamThreadTest {
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
         final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
                                          Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
@@ -847,7 +853,7 @@ public class StreamThreadTest {
         final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
 
         final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
@@ -916,7 +922,8 @@ public class StreamThreadTest {
         final StreamsConfig config1 = new StreamsConfig(configProps());
 
         final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+                                                     clientId, processId, new Metrics(), new MockTime(),
+                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -966,7 +973,8 @@ public class StreamThreadTest {
         };
 
         final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+                                                     clientId, processId, new Metrics(), new MockTime(),
+                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1018,7 +1026,8 @@ public class StreamThreadTest {
         final StreamsConfig config1 = new StreamsConfig(configProps());
 
         final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+                                                     clientId, processId, new Metrics(), new MockTime(),
+                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1068,7 +1077,8 @@ public class StreamThreadTest {
         final StreamsConfig config1 = new StreamsConfig(configProps());
 
         final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+                                                     clientId, processId, new Metrics(), new MockTime(),
+                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index f4772f0..5bdc4f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -61,6 +62,8 @@ public class StreamsMetadataStateTest {
     private TopicPartition topic4P0;
     private List<PartitionInfo> partitionInfos;
     private Cluster cluster;
+    private final String globalTable = "global-table";;
+    private StreamPartitioner<String, Object> partitioner;
 
     @Before
     public void before() {
@@ -84,6 +87,8 @@ public class StreamsMetadataStateTest {
             }
         });
 
+        builder.globalTable("global-topic", "global-table");
+
         builder.setApplicationId("appId");
 
         topic1P0 = new TopicPartition("topic-one", 0);
@@ -110,22 +115,28 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new StreamsMetadataState(builder);
+        discovery = new StreamsMetadataState(builder, hostOne);
         discovery.onChange(hostToPartitions, cluster);
+        partitioner = new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 1;
+            }
+        };
     }
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception {
-        new StreamsMetadataState(builder).getAllMetadataForStore("store");
+        new StreamsMetadataState(builder, hostOne).getAllMetadataForStore("store");
     }
 
     @Test
     public void shouldGetAllStreamInstances() throws Exception {
-        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"),
                 Utils.mkSet(topic1P0, topic2P1, topic4P0));
-        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1));
-        final StreamsMetadata three = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+        final StreamsMetadata three = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"),
                 Collections.singleton(topic3P0));
 
         Collection<StreamsMetadata> actual = discovery.getAllMetadata();
@@ -150,7 +161,7 @@ public class StreamsMetadataStateTest {
 
         discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
 
-        final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.<String>emptySet(),
+        final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.singleton(globalTable),
                 Collections.singleton(tp5));
         final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
         assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
@@ -158,9 +169,9 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetInstancesForStoreName() throws Exception {
-        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"),
                 Utils.mkSet(topic1P0, topic2P1, topic4P0));
-        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1));
         final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("table-one");
         assertEquals(2, actual.size());
@@ -186,7 +197,7 @@ public class StreamsMetadataStateTest {
 
         discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final StreamsMetadata expected = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+        final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"),
                 Collections.singleton(topic3P0));
 
         final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key",
@@ -202,15 +213,10 @@ public class StreamsMetadataStateTest {
 
         discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-three", "merged-table"),
+        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-three", "merged-table"),
                 Utils.mkSet(topic2P0, tp4));
 
-        StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", new StreamPartitioner<String, Object>() {
-            @Override
-            public Integer partition(final String key, final Object value, final int numPartitions) {
-                return 1;
-            }
-        });
+        StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", partitioner);
         assertEquals(expected, actual);
     }
 
@@ -227,7 +233,7 @@ public class StreamsMetadataStateTest {
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2));
         discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
 
-        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("global-table", "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1, topic2P2));
 
         final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
@@ -270,5 +276,40 @@ public class StreamsMetadataStateTest {
         discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
     }
 
+    @Test
+    public void shouldHaveGlobalStoreInAllMetadata() throws Exception {
+        final Collection<StreamsMetadata> metadata = discovery.getAllMetadataForStore(globalTable);
+        assertEquals(3, metadata.size());
+        for (StreamsMetadata streamsMetadata : metadata) {
+            assertTrue(streamsMetadata.stateStoreNames().contains(globalTable));
+        }
+    }
+
+    @Test
+    public void shouldGetMyMetadataForGlobalStoreWithKey() throws Exception {
+        final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", Serdes.String().serializer());
+        assertEquals(hostOne, metadata.hostInfo());
+    }
+
+    @Test
+    public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception {
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST);
+        streamsMetadataState.onChange(hostToPartitions, cluster);
+        assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
+    }
+
+    @Test
+    public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() throws Exception {
+        final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", partitioner);
+        assertEquals(hostOne, metadata.hostInfo());
+    }
+
+    @Test
+    public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception {
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST);
+        streamsMetadataState.onChange(hostToPartitions, cluster);
+        assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 4eadfa3..8746a86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Before;
 import org.junit.Test;
@@ -209,6 +210,11 @@ public class CachingKeyValueStoreTest {
         store.delete("key");
     }
 
+    @Test
+    public void shouldReturnNullIfKeyIsNull() throws Exception {
+        assertNull(store.get(null));
+    }
+
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;
         int i = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 05c32f0..0fd6001 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -18,6 +18,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.NoOpReadOnlyStore;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,7 +27,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest.toList;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -182,7 +183,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
 
     @Test
     public void shouldReturnLongMaxValueOnOverflow() throws Exception {
-        stubProviderTwo.addStore(storeName, new StateStoreTestUtils.NoOpReadOnlyStore<Object, Object>() {
+        stubProviderTwo.addStore(storeName, new NoOpReadOnlyStore<Object, Object>() {
             @Override
             public long approximateNumEntries() {
                 return Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index fc4a4c5..8bfcb7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -32,7 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest.toList;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index d098429..dc1fc5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -19,13 +19,12 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.StateStoreProviderStub;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Arrays.asList;
@@ -65,7 +64,7 @@ public class CompositeReadOnlyWindowStoreTest {
         underlyingWindowStore.put("my-key", "my-later-value", 10L);
 
         final WindowStoreIterator<String> iterator = windowStore.fetch("my-key", 0L, 25L);
-        final List<KeyValue<Long, String>> results = toList(iterator);
+        final List<KeyValue<Long, String>> results = StreamsTestUtils.toList(iterator);
 
         assertEquals(asList(new KeyValue<>(0L, "my-value"),
                             new KeyValue<>(10L, "my-later-value")),
@@ -87,10 +86,10 @@ public class CompositeReadOnlyWindowStoreTest {
         underlyingWindowStore.put("key-one", "value-one", 0L);
         secondUnderlying.put("key-two", "value-two", 10L);
 
-        final List<KeyValue<Long, String>> keyOneResults = toList(windowStore.fetch("key-one", 0L,
-                                                                                    1L));
-        final List<KeyValue<Long, String>> keyTwoResults = toList(windowStore.fetch("key-two", 10L,
-                                                                                    11L));
+        final List<KeyValue<Long, String>> keyOneResults = StreamsTestUtils.toList(windowStore.fetch("key-one", 0L,
+                                                                                                     1L));
+        final List<KeyValue<Long, String>> keyTwoResults = StreamsTestUtils.toList(windowStore.fetch("key-two", 10L,
+                                                                                                     11L));
 
         assertEquals(Collections.singletonList(KeyValue.pair(0L, "value-one")), keyOneResults);
         assertEquals(Collections.singletonList(KeyValue.pair(10L, "value-two")), keyTwoResults);
@@ -101,7 +100,7 @@ public class CompositeReadOnlyWindowStoreTest {
         otherUnderlyingStore.put("some-key", "some-value", 0L);
         underlyingWindowStore.put("some-key", "my-value", 1L);
 
-        final List<KeyValue<Long, String>> results = toList(windowStore.fetch("some-key", 0L, 2L));
+        final List<KeyValue<Long, String>> results = StreamsTestUtils.toList(windowStore.fetch("some-key", 0L, 2L));
         assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results);
     }
 
@@ -118,12 +117,4 @@ public class CompositeReadOnlyWindowStoreTest {
         underlyingWindowStore.fetch("key", 1, 10);
     }
 
-    static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {
-        final List<KeyValue<K, V>> results = new ArrayList<>();
-
-        while (iterator.hasNext()) {
-            results.add(iterator.next());
-        }
-        return results;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index bca4837..50845e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
new file mode 100644
index 0000000..ab2ef7e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class GlobalStateStoreProviderTest {
+
+    @Test
+    public void shouldReturnSingleItemListIfStoreExists() throws Exception {
+        final GlobalStateStoreProvider provider =
+                new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global", new NoOpReadOnlyStore<>()));
+        final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
+        assertEquals(stores.size(), 1);
+    }
+
+    @Test
+    public void shouldReturnEmptyItemListIfStoreDoesntExist() throws Exception {
+        final GlobalStateStoreProvider provider =
+                new GlobalStateStoreProvider(Collections.<String, StateStore>emptyMap());
+        final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
+        assertTrue(stores.isEmpty());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionIfStoreIsntOpen() throws Exception {
+        final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<>();
+        store.close();
+        final GlobalStateStoreProvider provider =
+                new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global", store));
+        provider.stores("global", QueryableStoreTypes.keyValueStore());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
deleted file mode 100644
index 6bb27b7..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-
-class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
-    private final TreeMap<K, V> map = new TreeMap<>();
-    private final String name;
-    private boolean open = true;
-
-    InMemoryKeyValueStore(final String name) {
-        this.name = name;
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        map.put(key, value);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        V orig = map.get(key);
-        if (orig == null) {
-            map.put(key, value);
-        }
-        return orig;
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        for (KeyValue<K, V> entry : entries) {
-            map.put(entry.key, entry.value);
-        }
-    }
-
-    @Override
-    public V delete(final K key) {
-        return map.remove(key);
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return map.size();
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        // no-op
-    }
-
-    @Override
-    public void flush() {
-        //no-op
-    }
-
-    @Override
-    public void close() {
-        open = false;
-    }
-
-    @Override
-    public boolean persistent() {
-        return false;
-    }
-
-    @Override
-    public boolean isOpen() {
-        return open;
-    }
-
-    @Override
-    public V get(final K key) {
-        return map.get(key);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator()));
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(map.entrySet().iterator()));
-    }
-
-    private class TheIterator implements KeyValueIterator<K, V> {
-
-        private final Iterator<Map.Entry<K, V>> underlying;
-
-        public TheIterator(final Iterator<Map.Entry<K, V>> iterator) {
-            this.underlying = iterator;
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        @Override
-        public K peekNextKey() {
-            throw new UnsupportedOperationException("peekNextKey not supported");
-        }
-
-        @Override
-        public boolean hasNext() {
-            return underlying.hasNext();
-        }
-
-        @Override
-        public KeyValue<K, V> next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            Map.Entry<K, V> next = underlying.next();
-            return new KeyValue<>(next.getKey(), next.getValue());
-        }
-
-        @Override
-        public void remove() {
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index df2fbca..fbcefab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 8efa024..050e3da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -306,4 +306,8 @@ public class NamedCacheTest {
         cache.evict();
     }
 
+    @Test
+    public void shouldReturnNullIfKeyIsNull() throws Exception {
+        assertNull(cache.get(null));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 3660e8e..fcf8760 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -16,13 +16,16 @@ package org.apache.kafka.streams.state.internals;
 
 
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.NoOpReadOnlyStore;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -31,15 +34,17 @@ public class QueryableStoreProviderTest {
     private final String keyValueStore = "key-value";
     private final String windowStore = "window-store";
     private QueryableStoreProvider storeProvider;
+    private HashMap<String, StateStore> globalStateStores;
 
     @Before
     public void before() {
         final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(false);
-        theStoreProvider.addStore(keyValueStore, new StateStoreTestUtils.NoOpReadOnlyStore<>());
+        theStoreProvider.addStore(keyValueStore, new NoOpReadOnlyStore<>());
         theStoreProvider.addStore(windowStore, new NoOpWindowStore());
+        globalStateStores = new HashMap<>();
         storeProvider =
             new QueryableStoreProvider(
-                Collections.<StateStoreProvider>singletonList(theStoreProvider));
+                    Collections.<StateStoreProvider>singletonList(theStoreProvider), new GlobalStateStoreProvider(globalStateStores));
     }
 
     @Test(expected = InvalidStateStoreException.class)
@@ -72,5 +77,11 @@ public class QueryableStoreProviderTest {
         storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore());
     }
 
+    @Test
+    public void shouldFindGlobalStores() throws Exception {
+        globalStateStores.put("global", new NoOpReadOnlyStore<>());
+        assertNotNull(storeProvider.getStore("global", QueryableStoreTypes.keyValueStore()));
+    }
+
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index d4cc99b..5fc9e1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -17,11 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -46,58 +43,4 @@ public class StateStoreTestUtils {
 
     }
 
-    static class NoOpReadOnlyStore<K, V>
-            implements ReadOnlyKeyValueStore<K, V>, StateStore {
-
-        @Override
-        public V get(final K key) {
-            return null;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(final K from, final K to) {
-            return null;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return null;
-        }
-
-        @Override
-        public long approximateNumEntries() {
-            return 0L;
-        }
-
-        @Override
-        public String name() {
-            return "";
-        }
-
-        @Override
-        public void init(final ProcessorContext context, final StateStore root) {
-
-        }
-
-        @Override
-        public void flush() {
-
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public boolean isOpen() {
-            return false;
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a6f7f81..ffc4485 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -112,7 +112,8 @@ public class StreamThreadStateStoreProviderTest {
         thread = new StreamThread(builder, streamsConfig, clientSupplier,
                                   applicationId,
                                   "clientId", UUID.randomUUID(), new Metrics(),
-                                  Time.SYSTEM, new StreamsMetadataState(builder)) {
+                                  Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                                  0) {
             @Override
             public Map<TaskId, StreamTask> tasks() {
                 return tasks;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 97a1f8b..499d823 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -510,6 +510,13 @@ public class ThreadCacheTest {
         assertNull(cache.get("two", new byte[] {1}));
     }
 
+    @Test
+    public void shouldReturnNullIfKeyIsNull() throws Exception {
+        final ThreadCache threadCache = new ThreadCache("testCache", 10, new MockStreamsMetrics(new Metrics()));
+        threadCache.put("one", new byte[]{1}, cleanEntry(new byte[] {1}));
+        assertNull(threadCache.get("one", null));
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
new file mode 100644
index 0000000..2f3ef26
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.GlobalStateManager;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public class GlobalStateManagerStub implements GlobalStateManager {
+
+    private final Set<String> storeNames;
+    private final Map<TopicPartition, Long> offsets;
+    public boolean initialized;
+    public boolean closed;
+
+    public GlobalStateManagerStub(final Set<String> storeNames, final Map<TopicPartition, Long> offsets) {
+        this.storeNames = storeNames;
+        this.offsets = offsets;
+    }
+
+    @Override
+    public Set<String> initialize(final InternalProcessorContext processorContext) {
+        initialized = true;
+        return storeNames;
+    }
+    
+    @Override
+    public File baseDir() {
+        return null;
+    }
+
+    @Override
+    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
+
+    }
+
+    @Override
+    public void flush(final InternalProcessorContext context) {
+
+    }
+
+    @Override
+    public void close(final Map<TopicPartition, Long> offsets) throws IOException {
+        this.offsets.putAll(offsets);
+        closed = true;
+    }
+
+    @Override
+    public StateStore getGlobalStore(final String name) {
+        return null;
+    }
+
+    @Override
+    public StateStore getStore(final String name) {
+        return null;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> checkpointedOffsets() {
+        return offsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java
new file mode 100644
index 0000000..fc9b32b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
+    private final TreeMap<K, V> map = new TreeMap<>();
+    private final String name;
+    private boolean open = true;
+
+    public InMemoryKeyValueStore(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void put(final K key, final V value) {
+        map.put(key, value);
+    }
+
+    @Override
+    public V putIfAbsent(final K key, final V value) {
+        V orig = map.get(key);
+        if (orig == null) {
+            map.put(key, value);
+        }
+        return orig;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries) {
+            map.put(entry.key, entry.value);
+        }
+    }
+
+    @Override
+    public V delete(final K key) {
+        return map.remove(key);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return map.size();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        // no-op
+    }
+
+    @Override
+    public void flush() {
+        //no-op
+    }
+
+    @Override
+    public void close() {
+        open = false;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public V get(final K key) {
+        return map.get(key);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new DelegatingPeekingKeyValueIterator<>(name(), new TheIterator(map.entrySet().iterator()));
+    }
+
+    private class TheIterator implements KeyValueIterator<K, V> {
+
+        private final Iterator<Map.Entry<K, V>> underlying;
+
+        public TheIterator(final Iterator<Map.Entry<K, V>> iterator) {
+            this.underlying = iterator;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey not supported");
+        }
+
+        @Override
+        public boolean hasNext() {
+            return underlying.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            Map.Entry<K, V> next = underlying.next();
+            return new KeyValue<>(next.getKey(), next.getValue());
+        }
+
+        @Override
+        public void remove() {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index e471300..b38daf1 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -44,6 +44,7 @@ public class KStreamTestDriver {
 
     private final ProcessorTopology topology;
     private final MockProcessorContext context;
+    private final ProcessorTopology globalTopology;
     private ThreadCache cache;
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
     public final File stateDir;
@@ -76,13 +77,22 @@ public class KStreamTestDriver {
                              long cacheSize) {
         builder.setApplicationId("TestDriver");
         this.topology = builder.build(null);
+        this.globalTopology = builder.buildGlobalStateTopology();
         this.stateDir = stateDir;
         this.cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics()));
         this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
         this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+        // init global topology first as it will add stores to the
+        // store map that are required for joins etc.
+        if (globalTopology != null) {
+            initTopology(globalTopology, globalTopology.globalStateStores());
+        }
+        initTopology(topology, topology.stateStores());
 
+    }
 
-        for (StateStore store : topology.stateStores()) {
+    private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) {
+        for (StateStore store : stores) {
             store.init(context, store);
         }
 
@@ -94,9 +104,9 @@ public class KStreamTestDriver {
                 context.setCurrentNode(null);
             }
         }
-
     }
 
+
     public ProcessorContext context() {
         return context;
     }
@@ -104,6 +114,9 @@ public class KStreamTestDriver {
     public void process(String topicName, Object key, Object value) {
         final ProcessorNode previous = currNode;
         currNode = topology.source(topicName);
+        if (currNode == null && globalTopology != null) {
+            currNode = globalTopology.source(topicName);
+        }
 
         // if currNode is null, check if this topic is a changelog topic;
         // if yes, skip
@@ -247,6 +260,17 @@ public class KStreamTestDriver {
         currNode = currentNode;
     }
 
+    public StateStore globalStateStore(final String storeName) {
+        if (globalTopology != null) {
+            for (final StateStore store : globalTopology.globalStateStores()) {
+                if (store.name().equals(storeName)) {
+                    return store;
+                }
+            }
+        }
+        return null;
+    }
+
 
     private class MockRecordCollector extends RecordCollectorImpl {
         public MockRecordCollector() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java b/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
new file mode 100644
index 0000000..ffcae7e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KTableValueGetterStub<K, V> implements KTableValueGetter<K, V> {
+
+    private final Map<K, V> data = new HashMap<>();
+
+    @Override
+    public void init(final ProcessorContext context) {
+
+    }
+
+    @Override
+    public V get(final K key) {
+        return data.get(key);
+    }
+
+    public void put(final K key, V value) {
+        data.put(key, value);
+    }
+
+    public void remove(final K key) {
+        data.remove(key);
+    }
+}


Mime
View raw message