kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/5] kafka git commit: KAFKA-4490: Add Global Table support to Kafka Streams
Date Thu, 12 Jan 2017 19:46:07 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 50efe5b..41277c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
@@ -53,19 +54,29 @@ public class KafkaStreamsTest {
     // quick enough)
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final KStreamBuilder builder = new KStreamBuilder();
+    private KafkaStreams streams;
+    private Properties props;
 
-    @Test
-    public void testStartAndClose() throws Exception {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
+    @Before
+    public void before() {
+        props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streams = new KafkaStreams(builder, props);
+    }
 
+    @Test
+    public void testInitializesAndDestroysMetricsReporters() throws Exception {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
-
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
+        final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final int initDiff = newInitCount - oldInitCount;
+        assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
+
         StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
         Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED);
@@ -77,14 +88,9 @@ public class KafkaStreamsTest {
         Assert.assertEquals(stateListener.oldState, KafkaStreams.State.CREATED);
         Assert.assertEquals(stateListener.newState, KafkaStreams.State.RUNNING);
         Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.RUNNING).longValue(), 1L);
-
-        final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final int initCountDifference = newInitCount - oldInitCount;
-        assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
-
-        assertTrue(streams.close(15, TimeUnit.SECONDS));
-        Assert.assertEquals("each reporter initialized should also be closed",
-            oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
+        final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
+        streams.close();
+        assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
         Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
         Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.RUNNING).longValue(), 1L);
         Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.NOT_RUNNING).longValue(), 1L);
@@ -92,13 +98,6 @@ public class KafkaStreamsTest {
 
     @Test
     public void testCloseIsIdempotent() throws Exception {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
@@ -109,12 +108,6 @@ public class KafkaStreamsTest {
 
     @Test(expected = IllegalStateException.class)
     public void testCannotStartOnceClosed() throws Exception {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
         streams.close();
         try {
@@ -129,12 +122,6 @@ public class KafkaStreamsTest {
 
     @Test(expected = IllegalStateException.class)
     public void testCannotStartTwice() throws Exception {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
 
         try {
@@ -184,25 +171,21 @@ public class KafkaStreamsTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
-        final KafkaStreams streams = createKafkaStreams();
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
-        final KafkaStreams streams = createKafkaStreams();
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
-        final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
-        final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String key, final Object value, final int numPartitions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
new file mode 100644
index 0000000..85b851d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class GlobalKTableIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER =
+            new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private static volatile int testNo = 0;
+    private final MockTime mockTime = CLUSTER.time;
+    private final KeyValueMapper<String, Long, Long> keyMapper = new KeyValueMapper<String, Long, Long>() {
+        @Override
+        public Long apply(final String key, final Long value) {
+            return value;
+        }
+    };
+    private final ValueJoiner<Long, String, String> joiner = new ValueJoiner<Long, String, String>() {
+        @Override
+        public String apply(final Long value1, final String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String globalOne;
+    private String inputStream;
+    private String inputTable;
+    private final String globalStore = "globalStore";
+    private GlobalKTable<Long, String> globalTable;
+    private KStream<String, Long> stream;
+    private KTable<String, Long> table;
+    final Map<String, String> results = new HashMap<>();
+    private ForeachAction<String, String> foreachAction;
+
+    @Before
+    public void before() {
+        testNo++;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        final String applicationId = "globalOne-table-test-" + testNo;
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration
+                .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore);
+        stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
+        table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
+        foreachAction = new ForeachAction<String, String>() {
+            @Override
+            public void apply(final String key, final String value) {
+                results.put(key, value);
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
+        final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
+        streamTableJoin.foreach(foreachAction);
+        produceInitialGlobalTableValues();
+        startStreams();
+        produceTopicValues(inputStream);
+
+        final Map<String, String> expected = new HashMap<>();
+        expected.put("a", "1+A");
+        expected.put("b", "2+B");
+        expected.put("c", "3+C");
+        expected.put("d", "4+D");
+        expected.put("e", "5+null");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return results.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
+
+
+        produceGlobalTableValues();
+
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return "J".equals(replicatedStore.get(5L));
+            }
+        }, 30000, "waiting for data in replicated store");
+        produceTopicValues(inputStream);
+
+        expected.put("a", "1+F");
+        expected.put("b", "2+G");
+        expected.put("c", "3+H");
+        expected.put("d", "4+I");
+        expected.put("e", "5+J");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return results.equals(expected);
+            }
+        }, 30000L, "waiting for final values");
+    }
+
+    @Test
+    public void shouldKStreamGlobalKTableJoin() throws Exception {
+        final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
+        streamTableJoin.foreach(foreachAction);
+        produceInitialGlobalTableValues();
+        startStreams();
+        produceTopicValues(inputStream);
+
+        final Map<String, String> expected = new HashMap<>();
+        expected.put("a", "1+A");
+        expected.put("b", "2+B");
+        expected.put("c", "3+C");
+        expected.put("d", "4+D");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return results.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
+
+
+        produceGlobalTableValues();
+
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return "J".equals(replicatedStore.get(5L));
+            }
+        }, 30000, "waiting for data in replicated store");
+
+        produceTopicValues(inputStream);
+
+        expected.put("a", "1+F");
+        expected.put("b", "2+G");
+        expected.put("c", "3+H");
+        expected.put("d", "4+I");
+        expected.put("e", "5+J");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return results.equals(expected);
+            }
+        }, 30000L, "waiting for final values");
+    }
+
+
+    private void createTopics() {
+        inputStream = "input-stream-" + testNo;
+        inputTable = "input-table-" + testNo;
+        globalOne = "globalOne-" + testNo;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(inputTable);
+        CLUSTER.createTopic(globalOne, 2, 1);
+    }
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+    private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+                topic,
+                Arrays.asList(
+                        new KeyValue<>("a", 1L),
+                        new KeyValue<>("b", 2L),
+                        new KeyValue<>("c", 3L),
+                        new KeyValue<>("d", 4L),
+                        new KeyValue<>("e", 5L)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        LongSerializer.class,
+                        new Properties()),
+                mockTime);
+    }
+
+    private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+                globalOne,
+                Arrays.asList(
+                        new KeyValue<>(1L, "A"),
+                        new KeyValue<>(2L, "B"),
+                        new KeyValue<>(3L, "C"),
+                        new KeyValue<>(4L, "D")),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        LongSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                mockTime);
+    }
+
+    private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+                globalOne,
+                Arrays.asList(
+                        new KeyValue<>(1L, "F"),
+                        new KeyValue<>(2L, "G"),
+                        new KeyValue<>(3L, "H"),
+                        new KeyValue<>(4L, "I"),
+                        new KeyValue<>(5L, "J")),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        LongSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                mockTime);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 111a271..213fffe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -397,7 +397,8 @@ public class RegexSourceIntegrationTest {
         public volatile List<String> assignedTopicPartitions = new ArrayList<>();
 
         public TestStreamThread(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
-            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
+            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                  0);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index a154744..91b2127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -28,11 +31,14 @@ import org.apache.kafka.test.MockValueJoiner;
 import org.junit.After;
 import org.junit.Test;
 
+import java.util.HashSet;
+import java.util.List;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class KStreamBuilderTest {
 
@@ -154,6 +160,83 @@ public class KStreamBuilderTest {
     }
 
     @Test
+    public void shouldNotGroupGlobalTableWithOtherStreams() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final GlobalKTable<String, String> globalTable = builder.globalTable("table", "globalTable");
+        final KStream<String, String> stream = builder.stream("t1");
+        final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
+            @Override
+            public String apply(final String key, final String value) {
+                return value;
+            }
+        };
+        stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
+        builder.stream("t2");
+        builder.setApplicationId("app-id");
+        final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+        assertEquals(Utils.mkSet("KTABLE-SOURCE-0000000001", "KSTREAM-SOURCE-0000000000"), nodeGroups.get(0));
+    }
+
+    @Test
+    public void shouldBuildSimpleGlobalTableTopology() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.globalTable("table", "globalTable");
+        final ProcessorTopology topology = builder.buildGlobalStateTopology();
+        final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap = topology.storeToProcessorNodeMap();
+        assertEquals(1, stateStoreProcessorNodeMap.size());
+        final StateStore store = stateStoreProcessorNodeMap.keySet().iterator().next();
+        assertEquals("globalTable", store.name());
+        assertEquals("KTABLE-SOURCE-0000000001", stateStoreProcessorNodeMap.get(store).name());
+    }
+
+    @Test
+    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.globalTable("table", "globalTable");
+        builder.globalTable("table2", "globalTable2");
+        final ProcessorTopology topology = builder.buildGlobalStateTopology();
+        final List<StateStore> stateStores = topology.globalStateStores();
+        assertEquals(Utils.mkSet("table", "table2"), topology.sourceTopics());
+        assertEquals(2, stateStores.size());
+    }
+
+    @Test
+    public void shouldAddGlobalTablesToEachGroup() throws Exception {
+        final String one = "globalTable";
+        final String two = "globalTable2";
+        final KStreamBuilder builder = new KStreamBuilder();
+        final GlobalKTable<String, String> globalTable = builder.globalTable("table", one);
+        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", two);
+
+        builder.table("not-global", "not-global");
+
+        final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
+            @Override
+            public String apply(final String key, final String value) {
+                return value;
+            }
+        };
+
+        final KStream<String, String> stream = builder.stream("t1");
+        stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
+        final KStream<String, String> stream2 = builder.stream("t2");
+        stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
+        builder.setApplicationId("app-id");
+        final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+        for (Integer groupId : nodeGroups.keySet()) {
+            final ProcessorTopology topology = builder.build(groupId);
+            final List<StateStore> stateStores = topology.globalStateStores();
+            final Set<String> names = new HashSet<>();
+            for (StateStore stateStore : stateStores) {
+                names.add(stateStore.name());
+            }
+            assertEquals(2, stateStores.size());
+            assertTrue(names.contains(one));
+            assertTrue(names.contains(two));
+        }
+    }
+
+    @Test
     public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId("app-id");

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
new file mode 100644
index 0000000..bbc9741
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class GlobalKTableJoinsTest {
+
+    private final KStreamBuilder builder = new KStreamBuilder();
+    private GlobalKTable<String, String> global;
+    private File stateDir;
+    private final Map<String, String> results = new HashMap<>();
+    private KStream<String, String> stream;
+    private KeyValueMapper<String, String, String> keyValueMapper;
+    private ForeachAction<String, String> action;
+    private final String streamTopic = "stream";
+    private final String globalTopic = "global";
+
+    @Before
+    public void setUp() throws Exception {
+        stateDir = TestUtils.tempDirectory();
+        global = builder.globalTable(Serdes.String(), Serdes.String(), globalTopic, "global-store");
+        stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic);
+        keyValueMapper = new KeyValueMapper<String, String, String>() {
+            @Override
+            public String apply(final String key, final String value) {
+                return value;
+            }
+        };
+        action = new ForeachAction<String, String>() {
+            @Override
+            public void apply(final String key, final String value) {
+                results.put(key, value);
+            }
+        };
+
+    }
+
+    @Test
+    public void shouldLeftJoinWithStream() throws Exception {
+        stream.leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
+                .foreach(action);
+
+        final Map<String, String> expected = new HashMap<>();
+        expected.put("1", "a+A");
+        expected.put("2", "b+B");
+        expected.put("3", "c+null");
+
+        verifyJoin(expected, streamTopic);
+
+    }
+
+    @Test
+    public void shouldInnerJoinWithStream() throws Exception {
+        stream.join(global, keyValueMapper,  MockValueJoiner.TOSTRING_JOINER)
+                .foreach(action);
+
+        final Map<String, String> expected = new HashMap<>();
+        expected.put("1", "a+A");
+        expected.put("2", "b+B");
+
+        verifyJoin(expected, streamTopic);
+    }
+
+    private void verifyJoin(final Map<String, String> expected, final String joinInput) {
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
+        // write some data to the global table
+        driver.process(globalTopic, "a", "A");
+        driver.process(globalTopic, "b", "B");
+        //write some data to the stream
+        driver.process(joinInput, "1", "a");
+        driver.process(joinInput, "2", "b");
+        driver.process(joinInput, "3", "c");
+        driver.flushState();
+
+        assertEquals(expected, results);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 5271404..9aa2cc8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.Before;
@@ -269,4 +271,46 @@ public class KStreamImplTest {
         testStream.foreach(null);
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTableOnJoinWithGlobalTable() throws Exception {
+        testStream.join((GlobalKTable) null,
+                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockValueJoiner.TOSTRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() throws Exception {
+        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+                        null,
+                        MockValueJoiner.TOSTRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() throws Exception {
+        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() throws Exception {
+        testStream.leftJoin((GlobalKTable) null,
+                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        MockValueJoiner.TOSTRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() throws Exception {
+        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+                        null,
+                        MockValueJoiner.TOSTRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() throws Exception {
+        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+                        MockKeyValueMapper.<String, String>SelectValueMapper(),
+                        null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 2eb522c..4bb64b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -348,4 +348,5 @@ public class KTableKTableJoinTest {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b085a84..b545005 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -242,11 +242,11 @@ public class TopologyBuilderTest {
         StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
         builder.addStateStore(supplier);
         builder.setApplicationId("X");
+        builder.addSource("source-1", "topic-1");
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
 
         assertEquals(0, builder.build(null).stateStores().size());
 
-        builder.addSource("source-1", "topic-1");
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.connectProcessorAndStateStores("processor-1", "store-1");
 
         List<StateStore> suppliers = builder.build(null).stateStores();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index cf161f8..9816b6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -67,8 +67,8 @@ public class AbstractTaskTest {
                                                       Collections.<String, SinkNode>emptyMap(),
                                                       Collections.<StateStore>emptyList(),
                                                       Collections.<String, String>emptyMap(),
-                                                      Collections.<StateStore, ProcessorNode>emptyMap()
-                                               ),
+                                                      Collections.<StateStore, ProcessorNode>emptyMap(),
+                                                      Collections.<StateStore>emptyList()),
                                 consumer,
                                 consumer,
                                 false,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
new file mode 100644
index 0000000..2d10c13
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.MockProcessorNode;
+import org.apache.kafka.test.NoOpProcessorContext;
+import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class GlobalStateManagerImplTest {
+
+
+    private final TopicPartition t1 = new TopicPartition("t1", 1);
+    private final TopicPartition t2 = new TopicPartition("t2", 1);
+    private GlobalStateManagerImpl stateManager;
+    private NoOpProcessorContext context;
+    private StateDirectory stateDirectory;
+    private String stateDirPath;
+    private NoOpReadOnlyStore<Object, Object> store1;
+    private NoOpReadOnlyStore store2;
+    private MockConsumer<byte[], byte[]> consumer;
+    private File checkpointFile;
+    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+
+    @Before
+    public void before() throws IOException {
+        final Map<String, String> storeToTopic = new HashMap<>();
+        storeToTopic.put("t1-store", "t1");
+        storeToTopic.put("t2-store", "t2");
+
+        final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>();
+        store1 = new NoOpReadOnlyStore<>("t1-store");
+        storeToProcessorNode.put(store1, new MockProcessorNode(-1));
+        store2 = new NoOpReadOnlyStore("t2-store");
+        storeToProcessorNode.put(store2, new MockProcessorNode(-1));
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                                                 Collections.<String, SourceNode>emptyMap(),
+                                                                 Collections.<String, SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 storeToTopic,
+                                                                 storeToProcessorNode,
+                                                                 Arrays.<StateStore>asList(store1, store2));
+
+        context = new NoOpProcessorContext();
+        stateDirPath = TestUtils.tempDirectory().getPath();
+        stateDirectory = new StateDirectory("appId", stateDirPath);
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
+        checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+    }
+
+    @After
+    public void after() throws IOException {
+        stateDirectory.unlockGlobalState();
+    }
+
+    @Test
+    public void shouldLockGlobalStateDirectory() throws Exception {
+        stateManager.initialize(context);
+        assertTrue(new File(stateDirectory.globalStateDir(), ".lock").exists());
+    }
+
+    @Test(expected = LockException.class)
+    public void shouldThrowStreamsExceptionIfCantGetLock() throws Exception {
+        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
+        try {
+            stateDir.lockGlobalState(1);
+            stateManager.initialize(context);
+        } finally {
+            stateDir.unlockGlobalState();
+        }
+    }
+
+    @Test
+    public void shouldReadCheckpointOffsets() throws Exception {
+        final Map<TopicPartition, Long> expected = writeCheckpoint();
+
+        stateManager.initialize(context);
+        final Map<TopicPartition, Long> offsets = stateManager.checkpointedOffsets();
+        assertEquals(expected, offsets);
+    }
+
+    @Test
+    public void shouldDeleteCheckpointFileAfteLoaded() throws Exception {
+        writeCheckpoint();
+        stateManager.initialize(context);
+        assertFalse(checkpointFile.exists());
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception {
+        final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
+            stream.write("0\n1\nblah".getBytes());
+        }
+        stateManager.initialize(context);
+    }
+
+    @Test
+    public void shouldInitializeStateStores() throws Exception {
+        stateManager.initialize(context);
+        assertTrue(store1.initialized);
+        assertTrue(store2.initialized);
+    }
+
+    @Test
+    public void shouldReturnInitializedStoreNames() throws Exception {
+        final Set<String> storeNames = stateManager.initialize(context);
+        assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
+    }
+
+    @Test
+    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() throws Exception {
+        stateManager.initialize(context);
+
+        try {
+            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), false, new TheStateRestoreCallback());
+            fail("should have raised an illegal argument exception as store is not in the topology");
+        } catch (final IllegalArgumentException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
+        stateManager.initialize(context);
+        initializeConsumer(2, 1, t1);
+        stateManager.register(store1, false, new TheStateRestoreCallback());
+        try {
+            stateManager.register(store1, false, new TheStateRestoreCallback());
+            fail("should have raised an illegal argument exception as store has already been registered");
+        } catch (final IllegalArgumentException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() throws Exception {
+        stateManager.initialize(context);
+        try {
+            stateManager.register(store1, false, new TheStateRestoreCallback());
+            fail("Should have raised a StreamsException as there are no partition for the store");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldRestoreRecordsUpToHighwatermark() throws Exception {
+        initializeConsumer(2, 1, t1);
+
+        stateManager.initialize(context);
+
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        stateManager.register(store1, false, stateRestoreCallback);
+        assertEquals(2, stateRestoreCallback.restored.size());
+    }
+
+    @Test
+    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws Exception {
+        initializeConsumer(5, 6, t1);
+
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
+                                                                                ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        offsetCheckpoint.write(Collections.singletonMap(t1, 6L));
+
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        stateManager.register(store1, false, stateRestoreCallback);
+        assertEquals(5, stateRestoreCallback.restored.size());
+    }
+
+
+    @Test
+    public void shouldFlushStateStores() throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        // register the stores
+        initializeConsumer(1, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        initializeConsumer(1, 1, t2);
+        stateManager.register(store2, false, stateRestoreCallback);
+
+        stateManager.flush(context);
+        assertTrue(store1.flushed);
+        assertTrue(store2.flushed);
+    }
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        // register the stores
+        initializeConsumer(1, 1, t1);
+        stateManager.register(new NoOpReadOnlyStore(store1.name()) {
+            @Override
+            public void flush() {
+                throw new RuntimeException("KABOOM!");
+            }
+        }, false, stateRestoreCallback);
+
+        stateManager.flush(context);
+    }
+
+    @Test
+    public void shouldCloseStateStores() throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        // register the stores
+        initializeConsumer(1, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        initializeConsumer(1, 1, t2);
+        stateManager.register(store2, false, stateRestoreCallback);
+
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+        assertFalse(store1.isOpen());
+        assertFalse(store2.isOpen());
+    }
+
+    @Test
+    public void shouldWriteCheckpointsOnClose() throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        initializeConsumer(1, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
+        stateManager.close(expected);
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
+                                                                                ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        final Map<TopicPartition, Long> result = offsetCheckpoint.read();
+        assertEquals(expected, result);
+    }
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws Exception {
+        stateManager.initialize(context);
+        initializeConsumer(1, 1, t1);
+        stateManager.register(new NoOpReadOnlyStore(store1.name()) {
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        }, false, stateRestoreCallback);
+
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+    }
+
+    @Test
+    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() throws Exception {
+        stateManager.initialize(context);
+        try {
+            stateManager.register(store1, false, null);
+            fail("should have thrown due to null callback");
+        } catch (IllegalArgumentException e) {
+            //pass
+        }
+    }
+
+    @Test
+    public void shouldUnlockGlobalStateDirectoryOnClose() throws Exception {
+        stateManager.initialize(context);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
+        try {
+            // should be able to get the lock now as it should've been released in close
+            assertTrue(stateDir.lockGlobalState(1));
+        } finally {
+            stateDir.unlockGlobalState();
+        }
+    }
+
+    @Test
+    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws Exception {
+        stateManager.initialize(context);
+        initializeConsumer(1, 1, t1);
+        stateManager.register(new NoOpReadOnlyStore("t1-store") {
+            @Override
+            public void close() {
+                if (!isOpen()) {
+                    throw new RuntimeException("store already closed");
+                }
+                super.close();
+            }
+        }, false, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+    }
+
+    @Test
+    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws Exception {
+        stateManager.initialize(context);
+        initializeConsumer(1, 1, t1);
+        initializeConsumer(1, 1, t2);
+        final NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store") {
+            @Override
+            public void close() {
+                super.close();
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        stateManager.register(store, false, stateRestoreCallback);
+
+        stateManager.register(store2, false, stateRestoreCallback);
+
+        try {
+            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+        } catch (ProcessorStateException e) {
+            // expected
+        }
+        assertFalse(store.isOpen());
+        assertFalse(store2.isOpen());
+    }
+
+    @Test
+    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception {
+        final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
+            stream.write("0\n1\nblah".getBytes());
+        }
+        try {
+            stateManager.initialize(context);
+        } catch (StreamsException e) {
+            // expected
+        }
+        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
+        try {
+            // should be able to get the lock now as it should've been released
+            assertTrue(stateDir.lockGlobalState(1));
+        } finally {
+            stateDir.unlockGlobalState();
+        }
+    }
+
+    private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) {
+        final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
+        startOffsets.put(topicPartition, 1L);
+        final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, startOffset + numRecords - 1);
+        consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
+        consumer.assign(Collections.singletonList(topicPartition));
+        consumer.updateEndOffsets(endOffsets);
+        consumer.updateBeginningOffsets(startOffsets);
+
+        for (int i = 0; i < numRecords; i++) {
+            consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), startOffset + i, "key".getBytes(), "value".getBytes()));
+        }
+    }
+
+    private Map<TopicPartition, Long> writeCheckpoint() throws IOException {
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile);
+        final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 1L);
+        checkpoint.write(expected);
+        return expected;
+    }
+
+    private static class TheStateRestoreCallback implements StateRestoreCallback {
+        private final List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
+
+        @Override
+        public void restore(final byte[] key, final byte[] value) {
+            restored.add(KeyValue.pair(key, value));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
new file mode 100644
index 0000000..afb8e76
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.test.GlobalStateManagerStub;
+import org.apache.kafka.test.MockProcessorNode;
+import org.apache.kafka.test.MockSourceNode;
+import org.apache.kafka.test.NoOpProcessorContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GlobalStateTaskTest {
+
+    private Map<TopicPartition, Long> offsets;
+    private GlobalStateUpdateTask globalStateTask;
+    private GlobalStateManagerStub stateMgr;
+    private List<ProcessorNode> processorNodes;
+    private NoOpProcessorContext context;
+    private TopicPartition t1;
+    private TopicPartition t2;
+    private MockSourceNode sourceOne;
+    private MockSourceNode sourceTwo;
+
+    @Before
+    public void before() {
+        sourceOne = new MockSourceNode<>(new String[]{"t1"},
+                                         new StringDeserializer(),
+                                         new StringDeserializer());
+        sourceTwo = new MockSourceNode<>(new String[]{"t2"},
+                                         new IntegerDeserializer(),
+                                         new IntegerDeserializer());
+        processorNodes = Arrays.asList(sourceOne, sourceTwo, new MockProcessorNode<>(-1), new MockProcessorNode<>(-1));
+        final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
+        final Map<String, SourceNode> sourceByTopics = new HashMap<>();
+        sourceByTopics.put("t1", sourceOne);
+        sourceByTopics.put("t2", sourceTwo);
+        final Map<String, String> storeToTopic = new HashMap<>();
+        storeToTopic.put("t1-store", "t1");
+        storeToTopic.put("t2-store", "t2");
+        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
+                                                                 sourceByTopics,
+                                                                 Collections.<String, SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 storeToTopic,
+                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
+        context = new NoOpProcessorContext();
+
+        t1 = new TopicPartition("t1", 1);
+        t2 = new TopicPartition("t2", 1);
+        offsets = new HashMap<>();
+        offsets.put(t1, 50L);
+        offsets.put(t2, 100L);
+        stateMgr = new GlobalStateManagerStub(storeNames, offsets);
+        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr);
+    }
+
+    @Test
+    public void shouldInitializeStateManager() throws Exception {
+        final Map<TopicPartition, Long> startingOffsets = globalStateTask.initialize();
+        assertTrue(stateMgr.initialized);
+        assertEquals(offsets, startingOffsets);
+    }
+
+    @Test
+    public void shouldInitializeContext() throws Exception {
+        globalStateTask.initialize();
+        assertTrue(context.initialized);
+    }
+
+    @Test
+    public void shouldInitializeProcessorTopology() throws Exception {
+        globalStateTask.initialize();
+        for (ProcessorNode processorNode : processorNodes) {
+            if (processorNode instanceof  MockProcessorNode) {
+                assertTrue(((MockProcessorNode) processorNode).initialized);
+            } else {
+                assertTrue(((MockSourceNode) processorNode).initialized);
+            }
+        }
+    }
+
+    @Test
+    public void shouldProcessRecordsForTopic() throws Exception {
+        globalStateTask.initialize();
+        globalStateTask.update(new ConsumerRecord<>("t1", 1, 1, "foo".getBytes(), "bar".getBytes()));
+        assertEquals(1, sourceOne.numReceived);
+        assertEquals(0, sourceTwo.numReceived);
+    }
+
+    @Test
+    public void shouldProcessRecordsForOtherTopic() throws Exception {
+        final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
+        globalStateTask.initialize();
+        globalStateTask.update(new ConsumerRecord<>("t2", 1, 1, integerBytes, integerBytes));
+        assertEquals(1, sourceTwo.numReceived);
+        assertEquals(0, sourceOne.numReceived);
+    }
+
+
+    @Test
+    public void shouldCloseStateManagerWithOffsets() throws Exception {
+        final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(t1, 52L);
+        expectedOffsets.put(t2, 100L);
+        globalStateTask.initialize();
+        globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.close();
+        assertEquals(expectedOffsets, stateMgr.checkpointedOffsets());
+        assertTrue(stateMgr.closed);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
new file mode 100644
index 0000000..67138f7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class GlobalStreamThreadTest {
+    private final KStreamBuilder builder = new KStreamBuilder();
+    private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private GlobalStreamThread globalStreamThread;
+
+    @Before
+    public void before() {
+        builder.globalTable("foo", "bar");
+        final HashMap<String, Object> properties = new HashMap<>();
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
+        final StreamsConfig config = new StreamsConfig(properties);
+        globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
+                                                    config,
+                                                    mockConsumer,
+                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath()),
+                                                    new Metrics(),
+                                                    new MockTime(),
+                                                    "client");
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionOnStartupIfThereIsAnException() throws Exception {
+        // should throw as the MockConsumer hasn't been configured and there are no
+        // partitions available
+        try {
+            globalStreamThread.start();
+            fail("Should have thrown StreamsException if start up failed");
+        } catch (StreamsException e) {
+            // ok
+        }
+        assertFalse(globalStreamThread.stillRunning());
+    }
+
+
+    @Test
+    public void shouldBeRunningAfterSuccesulStart() throws Exception {
+        initializeConsumer();
+        globalStreamThread.start();
+        assertTrue(globalStreamThread.stillRunning());
+    }
+
+    @Test(timeout = 30000)
+    public void shouldStopRunningWhenClosedByUser() throws Exception {
+        initializeConsumer();
+        globalStreamThread.start();
+        globalStreamThread.close();
+        globalStreamThread.join();
+    }
+
+    @Test
+    public void shouldCloseStateStoresOnClose() throws Exception {
+        initializeConsumer();
+        globalStreamThread.start();
+        final StateStore globalStore = builder.globalStateStores().get("bar");
+        assertTrue(globalStore.isOpen());
+        globalStreamThread.close();
+        globalStreamThread.join();
+        assertFalse(globalStore.isOpen());
+    }
+
+    private void initializeConsumer() {
+        mockConsumer.updatePartitions("foo", Collections.singletonList(new PartitionInfo("foo",
+                                                                                         0,
+                                                                                         null,
+                                                                                         new Node[0],
+                                                                                         new Node[0])));
+        final TopicPartition topicPartition = new TopicPartition("foo", 0);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
+        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
+    }
+
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index f8c080c..ecb9f27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -41,8 +41,8 @@ public class PartitionGroupTest {
     private final String[] topics = {"topic"};
     private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
     private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
-    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer));
-    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer));
+    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor);
+    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 3301744..71c234e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -135,6 +136,7 @@ public class ProcessorTopologyTest {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5", partition);
     }
 
+
     @Test
     public void testDrivingMultiplexingTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology());
@@ -196,6 +198,24 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldDriveGlobalStore() throws Exception {
+        final StateStoreSupplier storeSupplier = Stores.create("my-store")
+                .withStringKeys().withStringValues().inMemory().disableLogging().build();
+        final String global = "global";
+        final String topic = "topic";
+        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) storeSupplier.get();
+        final TopologyBuilder topologyBuilder = new TopologyBuilder()
+                .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
+
+        driver = new ProcessorTopologyTestDriver(config, topologyBuilder, "my-store");
+        driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertEquals("value1", globalStore.get("key1"));
+        assertEquals("value2", globalStore.get("key2"));
+    }
+
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         int partition = 10;
@@ -211,6 +231,7 @@ public class ProcessorTopologyTest {
     }
 
 
+
     protected void assertNextOutputRecord(String topic, String key, String value) {
         ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index e0ee3ce..a10e8f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -45,7 +45,9 @@ public class RecordQueueTest {
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
-    private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), new MockSourceNode<>(topics, intDeserializer, intDeserializer));
+    private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
+                                                      new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+                                                      timestampExtractor);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
@@ -61,7 +63,7 @@ public class RecordQueueTest {
             new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
-        queue.addRawRecords(list1, timestampExtractor);
+        queue.addRawRecords(list1);
 
         assertEquals(3, queue.size());
         assertEquals(1L, queue.timestamp());
@@ -83,7 +85,7 @@ public class RecordQueueTest {
             new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
-        queue.addRawRecords(list2, timestampExtractor);
+        queue.addRawRecords(list2);
 
         assertEquals(4, queue.size());
         assertEquals(3L, queue.timestamp());
@@ -110,7 +112,7 @@ public class RecordQueueTest {
             new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
-        queue.addRawRecords(list3, timestampExtractor);
+        queue.addRawRecords(list3);
 
         assertEquals(3, queue.size());
         assertEquals(4L, queue.timestamp());
@@ -127,7 +129,7 @@ public class RecordQueueTest {
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
 
-        queue.addRawRecords(records, timestampExtractor);
+        queue.addRawRecords(records);
     }
 
     @Test(expected = StreamsException.class)
@@ -136,25 +138,29 @@ public class RecordQueueTest {
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
 
-        queue.addRawRecords(records, timestampExtractor);
+        queue.addRawRecords(records);
     }
 
     @Test(expected = StreamsException.class)
     public void shouldThrowOnNegativeTimestamp() {
-        final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
-        queue.addRawRecords(records, new FailOnInvalidTimestamp());
+        final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
+                                                          new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+                                                          new FailOnInvalidTimestamp());
+        queue.addRawRecords(records);
     }
 
     @Test
     public void shouldDropOnNegativeTimestamp() {
-        final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
-        queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp());
+        final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
+                                                  new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+                                                  new LogAndSkipOnInvalidTimestamp());
+        queue.addRawRecords(records);
 
         assertEquals(0, queue.size());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
new file mode 100644
index 0000000..fdd9127
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SourceNodeRecordDeserializerTest {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
+                                                                                  1,
+                                                                                  1,
+                                                                                  10,
+                                                                                  TimestampType.LOG_APPEND_TIME,
+                                                                                  5,
+                                                                                  3,
+                                                                                  5,
+                                                                                  new byte[0],
+                                                                                  new byte[0]);
+
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() throws Exception {
+        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
+                new TheSourceNode(true, false));
+        recordDeserializer.deserialize(rawRecord);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() throws Exception {
+        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
+                new TheSourceNode(false, true));
+        recordDeserializer.deserialize(rawRecord);
+    }
+
+    @Test
+    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() throws Exception {
+        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
+                new TheSourceNode(false, false, "key", "value"));
+        final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
+        assertEquals(rawRecord.topic(), record.topic());
+        assertEquals(rawRecord.partition(), record.partition());
+        assertEquals(rawRecord.offset(), record.offset());
+        assertEquals(rawRecord.checksum(), record.checksum());
+        assertEquals("key", record.key());
+        assertEquals("value", record.value());
+        assertEquals(rawRecord.timestamp(), record.timestamp());
+        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+    }
+
+    static class TheSourceNode extends SourceNode {
+        private final boolean keyThrowsException;
+        private final boolean valueThrowsException;
+        private final Object key;
+        private final Object value;
+
+        TheSourceNode(final boolean keyThrowsException, final boolean valueThrowsException) {
+            this(keyThrowsException, valueThrowsException, null, null);
+        }
+
+        @SuppressWarnings("unchecked")
+        TheSourceNode(final boolean keyThrowsException,
+                      final boolean valueThrowsException,
+                      final Object key,
+                      final Object value) {
+            super("", new String[0], null, null);
+            this.keyThrowsException = keyThrowsException;
+            this.valueThrowsException = valueThrowsException;
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public Object deserializeKey(final String topic, final byte[] data) {
+            if (keyThrowsException) {
+                throw new RuntimeException();
+            }
+            return key;
+        }
+
+        @Override
+        public Object deserializeValue(final String topic, final byte[] data) {
+            if (valueThrowsException) {
+                throw new RuntimeException();
+            }
+            return value;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8079c980/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index c5fecfa..fdf0115 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -81,7 +81,8 @@ public class StandbyTaskTest {
                     new MockStateStoreSupplier(storeName2, true).get()
             ),
             Collections.<String, String>emptyMap(),
-            Collections.<StateStore, ProcessorNode>emptyMap());
+            Collections.<StateStore, ProcessorNode>emptyMap(),
+            Collections.<StateStore>emptyList());
 
     private final TopicPartition ktable = new TopicPartition("ktable1", 0);
     private final Set<TopicPartition> ktablePartitions = Utils.mkSet(ktable);
@@ -97,7 +98,8 @@ public class StandbyTaskTest {
                 put("ktable1", ktable.topic());
             }
         },
-            Collections.<StateStore, ProcessorNode>emptyMap());
+            Collections.<StateStore, ProcessorNode>emptyMap(),
+            Collections.<StateStore>emptyList());
     private File baseDir;
     private StateDirectory stateDirectory;
 


Mime
View raw message