kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/6] kafka git commit: KIP-28: Add a processor client for Kafka Streaming
Date Sat, 26 Sep 2015 00:24:23 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
new file mode 100644
index 0000000..b350222
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.RestoreFunc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ProcessorContextImpl implements ProcessorContext {
+
+    private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
+
+    private final int id;
+    private final StreamTask task;
+    private final Metrics metrics;
+    private final RecordCollector collector;
+    private final ProcessorStateManager stateMgr;
+
+    private final Serializer<?> keySerializer;
+    private final Serializer<?> valSerializer;
+    private final Deserializer<?> keyDeserializer;
+    private final Deserializer<?> valDeserializer;
+
+    private boolean initialized;
+
+    @SuppressWarnings("unchecked")
+    public ProcessorContextImpl(int id,
+                                StreamTask task,
+                                StreamingConfig config,
+                                RecordCollector collector,
+                                ProcessorStateManager stateMgr,
+                                Metrics metrics) {
+        this.id = id;
+        this.task = task;
+        this.metrics = metrics;
+        this.collector = collector;
+        this.stateMgr = stateMgr;
+
+        this.keySerializer = config.getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+        this.valSerializer = config.getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+        this.keyDeserializer = config.getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+        this.valDeserializer = config.getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+
+        this.initialized = false;
+    }
+
+    public RecordCollector recordCollector() {
+        return this.collector;
+    }
+
+    public void initialized() {
+        this.initialized = true;
+    }
+
+    @Override
+    public boolean joinable() {
+        Set<TopicPartition> partitions = this.task.partitions();
+        Map<Integer, List<String>> partitionsById = new HashMap<>();
+        int firstId = -1;
+        for (TopicPartition partition : partitions) {
+            if (!partitionsById.containsKey(partition.partition())) {
+                partitionsById.put(partition.partition(), new ArrayList<String>());
+            }
+            partitionsById.get(partition.partition()).add(partition.topic());
+
+            if (firstId < 0)
+                firstId = partition.partition();
+        }
+
+        List<String> topics = partitionsById.get(firstId);
+        for (List<String> topicsPerPartition : partitionsById.values()) {
+            if (topics.size() != topicsPerPartition.size())
+                return false;
+
+            for (String topic : topicsPerPartition) {
+                if (!topics.contains(topic))
+                    return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public int id() {
+        return id;
+    }
+
+    @Override
+    public Serializer<?> keySerializer() {
+        return this.keySerializer;
+    }
+
+    @Override
+    public Serializer<?> valueSerializer() {
+        return this.valSerializer;
+    }
+
+    @Override
+    public Deserializer<?> keyDeserializer() {
+        return this.keyDeserializer;
+    }
+
+    @Override
+    public Deserializer<?> valueDeserializer() {
+        return this.valDeserializer;
+    }
+
+    @Override
+    public File stateDir() {
+        return stateMgr.baseDir();
+    }
+
+    @Override
+    public Metrics metrics() {
+        return metrics;
+    }
+
+    @Override
+    public void register(StateStore store, RestoreFunc restoreFunc) {
+        if (initialized)
+            throw new KafkaException("Can only create state stores during initialization.");
+
+        stateMgr.register(store, restoreFunc);
+    }
+
+    @Override
+    public StateStore getStateStore(String name) {
+        return stateMgr.getStore(name);
+    }
+
+    @Override
+    public String topic() {
+        if (task.record() == null)
+            throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed");
+
+        return task.record().topic();
+    }
+
+    @Override
+    public int partition() {
+        if (task.record() == null)
+            throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed");
+
+        return task.record().partition();
+    }
+
+    @Override
+    public long offset() {
+        if (this.task.record() == null)
+            throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed");
+
+        return this.task.record().offset();
+    }
+
+    @Override
+    public long timestamp() {
+        if (task.record() == null)
+            throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed");
+
+        return task.record().timestamp;
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value) {
+        task.forward(key, value);
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value, int childIndex) {
+        task.forward(key, value, childIndex);
+    }
+
+    @Override
+    public void commit() {
+        task.needCommit();
+    }
+
+    @Override
+    public void schedule(long interval) {
+        task.schedule(interval);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
new file mode 100644
index 0000000..9127c3f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProcessorNode<K, V> {
+
+    private final List<ProcessorNode<?, ?>> children;
+
+    private final String name;
+    private final Processor<K, V> processor;
+
+    public ProcessorNode(String name) {
+        this(name, null);
+    }
+
+    public ProcessorNode(String name, Processor<K, V> processor) {
+        this.name = name;
+        this.processor = processor;
+        this.children = new ArrayList<>();
+    }
+
+    public final String name() {
+        return name;
+    }
+
+    public final Processor processor() {
+        return processor;
+    }
+
+    public final List<ProcessorNode<?, ?>> children() {
+        return children;
+    }
+
+    public void addChild(ProcessorNode<?, ?> child) {
+        children.add(child);
+    }
+
+    public void init(ProcessorContext context) {
+        processor.init(context);
+    }
+
+    public void process(K key, V value) {
+        processor.process(key, value);
+    }
+
+    public void close() {
+        processor.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
new file mode 100644
index 0000000..2f1fb35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProcessorStateManager {
+
+    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
+
+    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    public static final String LOCK_FILE_NAME = ".lock";
+
+    private final int id;
+    private final File baseDir;
+    private final FileLock directoryLock;
+    private final Map<String, StateStore> stores;
+    private final Consumer<byte[], byte[]> restoreConsumer;
+    private final Map<TopicPartition, Long> restoredOffsets;
+    private final Map<TopicPartition, Long> checkpointedOffsets;
+
+    public ProcessorStateManager(int id, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
+        this.id = id;
+        this.baseDir = baseDir;
+        this.stores = new HashMap<>();
+        this.restoreConsumer = restoreConsumer;
+        this.restoredOffsets = new HashMap<>();
+
+        // create the state directory for this task if missing (we won't create the parent directory)
+        createStateDirectory(baseDir);
+
+        // try to acquire the exclusive lock on the state directory
+        directoryLock = lockStateDirectory(baseDir);
+        if (directoryLock == null) {
+            throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
+        }
+
+        // load the checkpoint information
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+        this.checkpointedOffsets = new HashMap<>(checkpoint.read());
+
+        // delete the checkpoint file after finish loading its stored offsets
+        checkpoint.delete();
+    }
+
+    private static void createStateDirectory(File stateDir) throws IOException {
+        if (!stateDir.exists()) {
+            stateDir.mkdir();
+        }
+    }
+
+    public static FileLock lockStateDirectory(File stateDir) throws IOException {
+        File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
+        FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
+        try {
+            return channel.tryLock();
+        } catch (OverlappingFileLockException e) {
+            return null;
+        }
+    }
+
+    public File baseDir() {
+        return this.baseDir;
+    }
+
+    public void register(StateStore store, RestoreFunc restoreFunc) {
+        if (store.name().equals(CHECKPOINT_FILE_NAME))
+            throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
+
+        if (this.stores.containsKey(store.name()))
+            throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
+
+        // ---- register the store ---- //
+
+        // check that the underlying change log topic exist or not
+        if (restoreConsumer.listTopics().containsKey(store.name())) {
+            boolean partitionNotFound = true;
+            for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) {
+                if (partitionInfo.partition() == id) {
+                    partitionNotFound = false;
+                    break;
+                }
+            }
+
+            if (partitionNotFound)
+                throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition for group " + id);
+
+        } else {
+            throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet");
+        }
+
+        this.stores.put(store.name(), store);
+
+        // ---- try to restore the state from change-log ---- //
+
+        // subscribe to the store's partition
+        TopicPartition storePartition = new TopicPartition(store.name(), id);
+        if (!restoreConsumer.subscription().isEmpty()) {
+            throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
+        }
+        restoreConsumer.assign(Collections.singletonList(storePartition));
+
+        // calculate the end offset of the partition
+        // TODO: this is a bit hacky to first seek then position to get the end offset
+        restoreConsumer.seekToEnd(storePartition);
+        long endOffset = restoreConsumer.position(storePartition);
+
+        // load the previously flushed state and restore from the checkpointed offset of the change log
+        // if it exists in the offset file; restore the state from the beginning of the change log otherwise
+        if (checkpointedOffsets.containsKey(storePartition)) {
+            restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
+        } else {
+            // TODO: in this case, we need to ignore the preciously flushed state
+            restoreConsumer.seekToBeginning(storePartition);
+        }
+
+        // restore its state from changelog records; while restoring the log end offset
+        // should not change since it is only written by this thread.
+        while (true) {
+            for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+                restoreFunc.apply(record.key(), record.value());
+            }
+
+            if (restoreConsumer.position(storePartition) == endOffset) {
+                break;
+            } else if (restoreConsumer.position(storePartition) > endOffset) {
+                throw new IllegalStateException("Log end offset should not change while restoring");
+            }
+        }
+
+        // record the restored offset for its change log partition
+        long newOffset = restoreConsumer.position(storePartition);
+        restoredOffsets.put(storePartition, newOffset);
+
+        // un-assign the change log partition
+        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+    }
+
+    public StateStore getStore(String name) {
+        return stores.get(name);
+    }
+
+    public void cleanup() throws IOException {
+        // clean up any unknown files in the state directory
+        for (File file : this.baseDir.listFiles()) {
+            if (!this.stores.containsKey(file.getName())) {
+                log.info("Deleting state directory {}", file.getAbsolutePath());
+                file.delete();
+            }
+        }
+    }
+
+    public void flush() {
+        if (!this.stores.isEmpty()) {
+            log.debug("Flushing stores.");
+            for (StateStore store : this.stores.values())
+                store.flush();
+        }
+    }
+
+    public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
+        if (!stores.isEmpty()) {
+            log.debug("Closing stores.");
+            for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
+                log.debug("Closing storage engine {}", entry.getKey());
+                entry.getValue().flush();
+                entry.getValue().close();
+            }
+
+            Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
+            for (String storeName : stores.keySet()) {
+                TopicPartition part = new TopicPartition(storeName, id);
+
+                // only checkpoint the offset to the offsets file if it is persistent;
+                if (stores.get(storeName).persistent()) {
+                    Long offset = ackedOffsets.get(part);
+
+                    if (offset == null) {
+                        // if no record was produced. we need to check the restored offset.
+                        offset = restoredOffsets.get(part);
+                    }
+
+                    if (offset != null) {
+                        // store the last offset + 1 (the log position after restoration)
+                        checkpointOffsets.put(part, offset + 1);
+                    }
+                }
+            }
+
+            // write the checkpoint file before closing, to indicate clean shutdown
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+            checkpoint.write(checkpointOffsets);
+        }
+
+        // release the state directory directoryLock
+        directoryLock.release();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
new file mode 100644
index 0000000..3efae65
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ProcessorTopology {
+
+    private final List<ProcessorNode> processorNodes;
+    private final Map<String, SourceNode> sourceByTopics;
+
+    public ProcessorTopology(List<ProcessorNode> processorNodes,
+                             Map<String, SourceNode> sourceByTopics) {
+        this.processorNodes = Collections.unmodifiableList(processorNodes);
+        this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
+    }
+
+    public Set<String> sourceTopics() {
+        return sourceByTopics.keySet();
+    }
+
+    public SourceNode source(String topic) {
+        return sourceByTopics.get(topic);
+    }
+
+    public Set<SourceNode> sources() {
+        return new HashSet<>(sourceByTopics.values());
+    }
+
+    public List<ProcessorNode> processors() {
+        return processorNodes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
new file mode 100644
index 0000000..b4b7afe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.PriorityQueue;
+
+public class PunctuationQueue {
+
+    private PriorityQueue<PunctuationSchedule> pq = new PriorityQueue<>();
+
+    public void schedule(PunctuationSchedule sched) {
+        synchronized (pq) {
+            pq.add(sched);
+        }
+    }
+
+    public void close() {
+        synchronized (pq) {
+            pq.clear();
+        }
+    }
+
+    public boolean mayPunctuate(long timestamp, Punctuator punctuator) {
+        synchronized (pq) {
+            boolean punctuated = false;
+            PunctuationSchedule top = pq.peek();
+            while (top != null && top.timestamp <= timestamp) {
+                PunctuationSchedule sched = top;
+                pq.poll();
+                punctuator.punctuate(sched.node(), timestamp);
+                pq.add(sched.next());
+                punctuated = true;
+
+                top = pq.peek();
+            }
+
+            return punctuated;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
new file mode 100644
index 0000000..dc9a50d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+public class PunctuationSchedule extends Stamped<ProcessorNode> {
+
+    final long interval;
+
+    public PunctuationSchedule(ProcessorNode node, long interval) {
+        this(node, System.currentTimeMillis(), interval);
+    }
+
+    public PunctuationSchedule(ProcessorNode node, long time, long interval) {
+        super(node, time + interval);
+        this.interval = interval;
+    }
+
+    public ProcessorNode node() {
+        return value;
+    }
+
+    public PunctuationSchedule next() {
+        return new PunctuationSchedule(value, timestamp, interval);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
new file mode 100644
index 0000000..d99e2ae
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+public interface Punctuator {
+
+    void punctuate(ProcessorNode node, long streamTime);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
new file mode 100644
index 0000000..ad2f647
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RecordCollector {
+
+    private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
+
+    private final Producer<byte[], byte[]> producer;
+    private final Map<TopicPartition, Long> offsets;
+    private final Callback callback = new Callback() {
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception == null) {
+                TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                offsets.put(tp, metadata.offset());
+            } else {
+                log.error("Error sending record: ", exception);
+            }
+        }
+    };
+
+
+    public RecordCollector(Producer<byte[], byte[]> producer) {
+        this.producer = producer;
+        this.offsets = new HashMap<>();
+    }
+
+    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
+        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
+        this.producer.send(new ProducerRecord<>(record.topic(), keyBytes, valBytes), callback);
+    }
+
+    public void flush() {
+        this.producer.flush();
+    }
+
+    /**
+     * Closes this RecordCollector
+     */
+    public void close() {
+        producer.close();
+    }
+
+    /**
+     * The last ack'd offset from the producer
+     *
+     * @return the map from TopicPartition to offset
+     */
+    Map<TopicPartition, Long> offsets() {
+        return this.offsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
new file mode 100644
index 0000000..66f78d2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+import java.util.ArrayDeque;
+
+/**
+ * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
+ * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
+ * timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
+ */
+public class RecordQueue {
+
+    private final SourceNode source;
+    private final TopicPartition partition;
+    private final ArrayDeque<StampedRecord> fifoQueue;
+    private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
+
+    private long partitionTime = TimestampTracker.NOT_KNOWN;
+
+    public RecordQueue(TopicPartition partition, SourceNode source) {
+        this.partition = partition;
+        this.source = source;
+
+        this.fifoQueue = new ArrayDeque<>();
+        this.timeTracker = new MinTimestampTracker<>();
+    }
+
+    /**
+     * Returns the corresponding source node in the topology
+     *
+     * @return SourceNode
+     */
+    public SourceNode source() {
+        return source;
+    }
+
+    /**
+     * Returns the partition with which this queue is associated
+     *
+     * @return TopicPartition
+     */
+    public TopicPartition partition() {
+        return partition;
+    }
+
+    /**
+     * Add a batch of {@link ConsumerRecord} into the queue
+     *
+     * @param rawRecords the raw records
+     * @param timestampExtractor TimestampExtractor
+     * @return the size of this queue
+     */
+    public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, TimestampExtractor timestampExtractor) {
+        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+            // deserialize the raw record, extract the timestamp and put into the queue
+            Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
+            Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
+
+            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), key, value);
+            long timestamp = timestampExtractor.extract(record);
+
+            StampedRecord stampedRecord = new StampedRecord(record, timestamp);
+
+            fifoQueue.addLast(stampedRecord);
+            timeTracker.addElement(stampedRecord);
+        }
+
+        return size();
+    }
+
+    /**
+     * Get the next {@link StampedRecord} from the queue
+     *
+     * @return StampedRecord
+     */
+    public StampedRecord poll() {
+        StampedRecord elem = fifoQueue.pollFirst();
+
+        if (elem == null)
+            return null;
+
+        timeTracker.removeElement(elem);
+
+        // only advance the partition timestamp if its currently
+        // tracked min timestamp has exceeded its value
+        long timestamp = timeTracker.get();
+
+        if (timestamp > partitionTime)
+            partitionTime = timestamp;
+
+        return elem;
+    }
+
+    /**
+     * Returns the number of records in the queue
+     *
+     * @return the number of records
+     */
+    public int size() {
+        return fifoQueue.size();
+    }
+
+    /**
+     * Tests if the queue is empty
+     *
+     * @return true if the queue is empty, otherwise false
+     */
+    public boolean isEmpty() {
+        return fifoQueue.isEmpty();
+    }
+
+    /**
+     * Returns the tracked partition timestamp
+     *
+     * @return timestamp
+     */
+    public long timestamp() {
+        return partitionTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
new file mode 100644
index 0000000..e2d881c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public class SinkNode<K, V> extends ProcessorNode<K, V> {
+
+    private final String topic;
+    private Serializer<K> keySerializer;
+    private Serializer<V> valSerializer;
+
+    private ProcessorContext context;
+
+    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+        super(name);
+
+        this.topic = topic;
+        this.keySerializer = keySerializer;
+        this.valSerializer = valSerializer;
+    }
+
+    @Override
+    public void addChild(ProcessorNode<?, ?> child) {
+        throw new UnsupportedOperationException("sink node does not allow addChild");
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+        if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerializer();
+        if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerializer();
+    }
+
+    @Override
+    public void process(K key, V value) {
+        // send to all the registered topics
+        RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+        collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer);
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
new file mode 100644
index 0000000..fa4afaf
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public class SourceNode<K, V> extends ProcessorNode<K, V> {
+
+    private Deserializer<K> keyDeserializer;
+    private Deserializer<V> valDeserializer;
+    private ProcessorContext context;
+
+    public SourceNode(String name, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
+        super(name);
+
+        this.keyDeserializer = keyDeserializer;
+        this.valDeserializer = valDeserializer;
+    }
+
+    public K deserializeKey(String topic, byte[] data) {
+        return keyDeserializer.deserialize(topic, data);
+    }
+
+    public V deserializeValue(String topic, byte[] data) {
+        return valDeserializer.deserialize(topic, data);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+
+        // if serializers are null, get the default ones from the context
+        if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
+        if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueDeserializer();
+    }
+
+    @Override
+    public void process(K key, V value) {
+        context.forward(key, value);
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
new file mode 100644
index 0000000..4e44667
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+public class Stamped<V> implements Comparable {
+
+    public final V value;
+    public final long timestamp;
+
+    public Stamped(V value, long timestamp) {
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public int compareTo(Object other) {
+        long otherTimestamp = ((Stamped<?>) other).timestamp;
+
+        if (timestamp < otherTimestamp) return -1;
+        else if (timestamp > otherTimestamp) return 1;
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
new file mode 100644
index 0000000..febd938
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
+
+    public StampedRecord(ConsumerRecord<Object, Object> record, long timestamp) {
+        super(record, timestamp);
+    }
+
+    public String topic() {
+        return value.topic();
+    }
+
+    public int partition() {
+        return value.partition();
+    }
+
+    public Object key() {
+        return value.key();
+    }
+
+    public Object value() {
+        return value.value();
+    }
+
+    public long offset() {
+        return value.offset();
+    }
+
+    @Override
+    public String toString() {
+        return value.toString() + ", timestamp = " + timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
new file mode 100644
index 0000000..40fb723
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
+ */
+public class StreamTask implements Punctuator {
+
+    private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
+
+    private final int id;
+    private final int maxBufferedSize;
+
+    private final Consumer consumer;
+    private final PartitionGroup partitionGroup;
+    private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
+    private final PunctuationQueue punctuationQueue;
+    private final ProcessorContextImpl processorContext;
+    private final ProcessorTopology topology;
+
+    private final Map<TopicPartition, Long> consumedOffsets;
+    private final RecordCollector recordCollector;
+    private final ProcessorStateManager stateMgr;
+
+    private boolean commitRequested = false;
+    private boolean commitOffsetNeeded = false;
+    private StampedRecord currRecord = null;
+    private ProcessorNode currNode = null;
+
+    /**
+     * Create {@link StreamTask} with its assigned partitions
+     *
+     * @param id                    the ID of this task
+     * @param consumer              the instance of {@link Consumer}
+     * @param producer              the instance of {@link Producer}
+     * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
+     * @param partitions            the collection of assigned {@link TopicPartition}
+     * @param topology              the instance of {@link ProcessorTopology}
+     * @param config                the {@link StreamingConfig} specified by the user
+     */
+    public StreamTask(int id,
+                      Consumer<byte[], byte[]> consumer,
+                      Producer<byte[], byte[]> producer,
+                      Consumer<byte[], byte[]> restoreConsumer,
+                      Collection<TopicPartition> partitions,
+                      ProcessorTopology topology,
+                      StreamingConfig config) {
+
+        this.id = id;
+        this.consumer = consumer;
+        this.punctuationQueue = new PunctuationQueue();
+        this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        this.topology = topology;
+
+        // create queues for each assigned partition and associate them
+        // to corresponding source nodes in the processor topology
+        Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
+
+        for (TopicPartition partition : partitions) {
+            SourceNode source = topology.source(partition.topic());
+            RecordQueue queue = createRecordQueue(partition, source);
+            partitionQueues.put(partition, queue);
+        }
+
+        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+        this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
+
+        // initialize the consumed and produced offset cache
+        this.consumedOffsets = new HashMap<>();
+
+        // create the record recordCollector that maintains the produced offsets
+        this.recordCollector = new RecordCollector(producer);
+
+        log.info("Creating restoration consumer client for stream task [" + id + "]");
+
+        // create the processor state manager
+        try {
+            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(id));
+            this.stateMgr = new ProcessorStateManager(id, stateFile, restoreConsumer);
+        } catch (IOException e) {
+            throw new KafkaException("Error while creating the state manager", e);
+        }
+
+        // initialize the topology with its own context
+        this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, new Metrics());
+
+        // initialize the task by initializing all its processor nodes in the topology
+        for (ProcessorNode node : this.topology.processors()) {
+            this.currNode = node;
+            try {
+                node.init(this.processorContext);
+            } finally {
+                this.currNode = null;
+            }
+        }
+
+        this.processorContext.initialized();
+    }
+
+    public int id() {
+        return id;
+    }
+
+    public Set<TopicPartition> partitions() {
+        return this.partitionGroup.partitions();
+    }
+
+    /**
+     * Adds records to queues
+     *
+     * @param partition the partition
+     * @param records  the records
+     */
+    @SuppressWarnings("unchecked")
+    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
+        int queueSize = partitionGroup.addRawRecords(partition, records);
+
+        // if after adding these records, its partition queue's buffered size has been
+        // increased beyond the threshold, we can then pause the consumption for this partition
+        if (queueSize > this.maxBufferedSize) {
+            consumer.pause(partition);
+        }
+    }
+
+    /**
+     * Process one record
+     *
+     * @return number of records left in the buffer of this task's partition group after the processing is done
+     */
+    @SuppressWarnings("unchecked")
+    public int process() {
+        synchronized (this) {
+            // get the next record to process
+            StampedRecord record = partitionGroup.nextRecord(recordInfo);
+
+            // if there is no record to process, return immediately
+            if (record == null)
+                return 0;
+
+            try {
+                // process the record by passing to the source node of the topology
+                this.currRecord = record;
+                this.currNode = recordInfo.node();
+                TopicPartition partition = recordInfo.partition();
+
+                log.debug("Start processing one record [" + currRecord + "]");
+
+                this.currNode.process(currRecord.key(), currRecord.value());
+
+                log.debug("Completed processing one record [" + currRecord + "]");
+
+                // update the consumed offset map after processing is done
+                consumedOffsets.put(partition, currRecord.offset());
+                commitOffsetNeeded = true;
+
+                // after processing this record, if its partition queue's buffered size has been
+                // decreased to the threshold, we can then resume the consumption on this partition
+                if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
+                    consumer.resume(partition);
+                }
+            } finally {
+                this.currRecord = null;
+                this.currNode = null;
+            }
+
+            return partitionGroup.numBuffered();
+        }
+    }
+
+    /**
+     * Possibly trigger registered punctuation functions if
+     * current time has reached the defined stamp
+     *
+     * @param timestamp
+     */
+    public boolean maybePunctuate(long timestamp) {
+        return punctuationQueue.mayPunctuate(timestamp, this);
+    }
+
+    @Override
+    public void punctuate(ProcessorNode node, long timestamp) {
+        if (currNode != null)
+            throw new IllegalStateException("Current node is not null");
+
+        currNode = node;
+        try {
+            node.processor().punctuate(timestamp);
+        } finally {
+            currNode = null;
+        }
+    }
+
+    public StampedRecord record() {
+        return this.currRecord;
+    }
+
+    public ProcessorNode node() {
+        return this.currNode;
+    }
+
+    public ProcessorTopology topology() {
+        return this.topology;
+    }
+
+    /**
+     * Commit the current task state
+     */
+    public void commit() {
+        // 1) flush produced records in the downstream and change logs of local states
+        recordCollector.flush();
+
+        // 2) flush local state
+        stateMgr.flush();
+
+        // 3) commit consumed offsets if it is dirty already
+        if (commitOffsetNeeded) {
+            Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
+            for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+                consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue()));
+            }
+            consumer.commitSync(consumedOffsetsAndMetadata);
+            commitOffsetNeeded = false;
+        }
+
+        commitRequested = false;
+    }
+
+    /**
+     * Whether or not a request has been made to commit the current state
+     */
+    public boolean commitNeeded() {
+        return this.commitRequested;
+    }
+
+    /**
+     * Request committing the current task's state
+     */
+    public void needCommit() {
+        this.commitRequested = true;
+    }
+
+    /**
+     * Schedules a punctuation for the processor
+     *
+     * @param interval  the interval in milliseconds
+     */
+    public void schedule(long interval) {
+        if (currNode == null)
+            throw new IllegalStateException("Current node is null");
+
+        punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
+    }
+
+    public void close() {
+        this.partitionGroup.close();
+        this.consumedOffsets.clear();
+
+        // close the processors
+        // make sure close() is called for each node even when there is a RuntimeException
+        RuntimeException exception = null;
+        for (ProcessorNode node : this.topology.processors()) {
+            currNode = node;
+            try {
+                node.close();
+            } catch (RuntimeException e) {
+                exception = e;
+            } finally {
+                currNode = null;
+            }
+        }
+
+        if (exception != null)
+            throw exception;
+
+        try {
+            stateMgr.close(recordCollector.offsets());
+        } catch (IOException e) {
+            throw new KafkaException("Error while closing the state manager in processor context", e);
+        }
+    }
+
+    private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
+        return new RecordQueue(partition, source);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value) {
+        ProcessorNode thisNode = currNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+            currNode = childNode;
+            try {
+                childNode.process(key, value);
+            } finally {
+                currNode = thisNode;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, int childIndex) {
+        ProcessorNode thisNode = currNode;
+        ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
+        currNode = childNode;
+        try {
+            childNode.process(key, value);
+        } finally {
+            currNode = thisNode;
+        }
+    }
+
+    public ProcessorContext context() {
+        return processorContext;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
new file mode 100644
index 0000000..f37903f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StreamThread extends Thread {
+
+    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
+    private static AtomicInteger nextThreadNumber = new AtomicInteger(1);
+
+    private final AtomicBoolean running;
+
+    protected final StreamingConfig config;
+    protected final TopologyBuilder builder;
+    protected final Producer<byte[], byte[]> producer;
+    protected final Consumer<byte[], byte[]> consumer;
+    protected final Consumer<byte[], byte[]> restoreConsumer;
+
+    private final Map<Integer, StreamTask> tasks;
+    private final Time time;
+    private final File stateDir;
+    private final long pollTimeMs;
+    private final long cleanTimeMs;
+    private final long commitTimeMs;
+    private final long totalRecordsToProcess;
+    private final StreamingMetrics metrics;
+
+    private long lastClean;
+    private long lastCommit;
+    private long recordsProcessed;
+
+    final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
+            addPartitions(assignment);
+            lastClean = time.milliseconds(); // start the cleaning cycle
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
+            commitAll();
+            removePartitions();
+            lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
+        }
+    };
+
+    public StreamThread(TopologyBuilder builder, StreamingConfig config) throws Exception {
+        this(builder, config, null , null, null, new SystemTime());
+    }
+
+    StreamThread(TopologyBuilder builder, StreamingConfig config,
+                 Producer<byte[], byte[]> producer,
+                 Consumer<byte[], byte[]> consumer,
+                 Consumer<byte[], byte[]> restoreConsumer,
+                 Time time) throws Exception {
+        super("StreamThread-" + nextThreadNumber.getAndIncrement());
+
+        this.config = config;
+        this.builder = builder;
+
+        // set the producer and consumer clients
+        this.producer = (producer != null) ? producer : createProducer();
+        this.consumer = (consumer != null) ? consumer : createConsumer();
+        this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
+
+        // initialize the task list
+        this.tasks = new HashMap<>();
+
+        // read in task specific config values
+        this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
+        this.stateDir.mkdir();
+        this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
+        this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
+        this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+        this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
+
+        this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
+        this.lastCommit = time.milliseconds();
+        this.recordsProcessed = 0;
+        this.time = time;
+
+        this.metrics = new StreamingMetrics();
+
+        this.running = new AtomicBoolean(true);
+    }
+
+    private Producer<byte[], byte[]> createProducer() {
+        log.info("Creating producer client for stream thread [" + this.getName() + "]");
+        return new KafkaProducer<>(config.getProducerConfigs(),
+                new ByteArraySerializer(),
+                new ByteArraySerializer());
+    }
+
+    private Consumer<byte[], byte[]> createConsumer() {
+        log.info("Creating consumer client for stream thread [" + this.getName() + "]");
+        return new KafkaConsumer<>(config.getConsumerConfigs(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+    }
+    
+    private Consumer<byte[], byte[]> createRestoreConsumer() {
+        log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
+        return new KafkaConsumer<>(config.getConsumerConfigs(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+    }
+
+    /**
+     * Execute the stream processors
+     */
+    @Override
+    public void run() {
+        log.info("Starting stream thread [" + this.getName() + "]");
+
+        try {
+            runLoop();
+        } catch (RuntimeException e) {
+            log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e);
+            throw e;
+        } finally {
+            shutdown();
+        }
+    }
+
+    /**
+     * Shutdown this streaming thread.
+     */
+    public void close() {
+        running.set(false);
+    }
+
+    public Map<Integer, StreamTask> tasks() {
+        return Collections.unmodifiableMap(tasks);
+    }
+
+    private void shutdown() {
+        log.info("Shutting down stream thread [" + this.getName() + "]");
+
+        // Exceptions should not prevent this call from going through all shutdown steps.
+        try {
+            commitAll();
+        } catch (Throwable e) {
+            // already logged in commitAll()
+        }
+        try {
+            producer.close();
+        } catch (Throwable e) {
+            log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
+        }
+        try {
+            consumer.close();
+        } catch (Throwable e) {
+            log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e);
+        }
+        try {
+            restoreConsumer.close();
+        } catch (Throwable e) {
+            log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
+        }
+        try {
+            removePartitions();
+        } catch (Throwable e) {
+            // already logged in removePartition()
+        }
+
+        log.info("Stream thread shutdown complete [" + this.getName() + "]");
+    }
+
+    private void runLoop() {
+        try {
+            int totalNumBuffered = 0;
+
+            consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
+
+            while (stillRunning()) {
+                long startPoll = time.milliseconds();
+
+                // try to fetch some records if necessary
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
+
+                for (StreamTask task : tasks.values()) {
+                    for (TopicPartition partition : task.partitions()) {
+                        task.addRecords(partition, records.records(partition));
+                    }
+                }
+
+                long endPoll = time.milliseconds();
+                metrics.pollTimeSensor.record(endPoll - startPoll);
+
+                // try to process one record from each task
+                totalNumBuffered = 0;
+
+                for (StreamTask task : tasks.values()) {
+                    long startProcess = time.milliseconds();
+
+                    totalNumBuffered += task.process();
+
+                    metrics.processTimeSensor.record(time.milliseconds() - startProcess);
+                }
+
+                maybePunctuate();
+                maybeClean();
+                maybeCommit();
+            }
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private boolean stillRunning() {
+        if (!running.get()) {
+            log.debug("Shutting down at user request.");
+            return false;
+        }
+
+        if (totalRecordsToProcess >= 0 && recordsProcessed >= totalRecordsToProcess) {
+            log.debug("Shutting down as we've reached the user configured limit of {} records to process.", totalRecordsToProcess);
+            return false;
+        }
+
+        return true;
+    }
+
+    private void maybePunctuate() {
+        for (StreamTask task : tasks.values()) {
+            try {
+                long now = time.milliseconds();
+
+                if (task.maybePunctuate(now))
+                    metrics.punctuateTimeSensor.record(time.milliseconds() - now);
+
+            } catch (Exception e) {
+                log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+                throw e;
+            }
+        }
+    }
+
+    protected void maybeCommit() {
+        long now = time.milliseconds();
+
+        if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) {
+            log.trace("Committing processor instances because the commit interval has elapsed.");
+
+            commitAll();
+            lastCommit = now;
+        } else {
+            for (StreamTask task : tasks.values()) {
+                try {
+                    if (task.commitNeeded())
+                        commitOne(task, time.milliseconds());
+                } catch (Exception e) {
+                    log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+                    throw e;
+                }
+            }
+        }
+    }
+
+    /**
+     * Commit the states of all its tasks
+     */
+    private void commitAll() {
+        for (StreamTask task : tasks.values()) {
+            try {
+                commitOne(task, time.milliseconds());
+            } catch (Exception e) {
+                log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Commit the state of a task
+     */
+    private void commitOne(StreamTask task, long now) {
+        try {
+            task.commit();
+        } catch (Exception e) {
+            log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            throw e;
+        }
+
+        metrics.commitTimeSensor.record(time.milliseconds() - now);
+    }
+
+    /**
+     * Cleanup any states of the tasks that have been removed from this thread
+     */
+    protected void maybeClean() {
+        long now = time.milliseconds();
+
+        if (now > lastClean + cleanTimeMs) {
+            File[] stateDirs = stateDir.listFiles();
+            if (stateDirs != null) {
+                for (File dir : stateDirs) {
+                    try {
+                        Integer id = Integer.parseInt(dir.getName());
+
+                        // try to acquire the exclusive lock on the state directory
+                        FileLock directoryLock = null;
+                        try {
+                            directoryLock = ProcessorStateManager.lockStateDirectory(dir);
+                            if (directoryLock != null) {
+                                log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
+                                Utils.delete(dir);
+                            }
+                        } catch (IOException e) {
+                            log.error("Failed to lock the state directory due to an unexpected exception", e);
+                        } finally {
+                            if (directoryLock != null) {
+                                try {
+                                    directoryLock.release();
+                                } catch (IOException e) {
+                                    log.error("Failed to release the state directory lock");
+                                }
+                            }
+                        }
+                    } catch (NumberFormatException e) {
+                        // there may be some unknown files that sits in the same directory,
+                        // we should ignore these files instead trying to delete them as well
+                    }
+                }
+            }
+
+            lastClean = now;
+        }
+    }
+
+    protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+        metrics.taskCreationSensor.record();
+
+        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config);
+    }
+
+    private void addPartitions(Collection<TopicPartition> assignment) {
+        HashSet<TopicPartition> partitions = new HashSet<>(assignment);
+
+        // TODO: change this hard-coded co-partitioning behavior
+        for (TopicPartition partition : partitions) {
+            final Integer id = partition.partition();
+            StreamTask task = tasks.get(id);
+            if (task == null) {
+                // get the partitions for the task
+                HashSet<TopicPartition> partitionsForTask = new HashSet<>();
+                for (TopicPartition part : partitions)
+                    if (part.partition() == id)
+                        partitionsForTask.add(part);
+
+                // create the task
+                try {
+                    task = createStreamTask(id, partitionsForTask);
+                } catch (Exception e) {
+                    log.error("Failed to create a task #" + id + " in thread [" + this.getName() + "]: ", e);
+                    throw e;
+                }
+                tasks.put(id, task);
+            }
+        }
+
+        lastClean = time.milliseconds();
+    }
+
+    private void removePartitions() {
+
+        // TODO: change this clearing tasks behavior
+        for (StreamTask task : tasks.values()) {
+            log.info("Removing task {}", task.id());
+            try {
+                task.close();
+            } catch (Exception e) {
+                log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+                throw e;
+            }
+            metrics.taskDestructionSensor.record();
+        }
+        tasks.clear();
+    }
+
+    private class StreamingMetrics {
+        final Metrics metrics;
+
+        final Sensor commitTimeSensor;
+        final Sensor pollTimeSensor;
+        final Sensor processTimeSensor;
+        final Sensor punctuateTimeSensor;
+        final Sensor taskCreationSensor;
+        final Sensor taskDestructionSensor;
+
+        public StreamingMetrics() {
+            String metricGrpName = "streaming-metrics";
+
+            this.metrics = new Metrics();
+            Map<String, String> metricTags = new LinkedHashMap<String, String>();
+            metricTags.put("client-id", config.getString(StreamingConfig.CLIENT_ID_CONFIG) + "-" + getName());
+
+            this.commitTimeSensor = metrics.sensor("commit-time");
+            this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
+            this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
+            this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
+
+            this.pollTimeSensor = metrics.sensor("poll-time");
+            this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
+            this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
+            this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
+
+            this.processTimeSensor = metrics.sensor("process-time");
+            this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
+            this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
+            this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
+
+            this.punctuateTimeSensor = metrics.sensor("punctuate-time");
+            this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
+            this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
+            this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
+
+            this.taskCreationSensor = metrics.sensor("task-creation");
+            this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
+
+            this.taskDestructionSensor = metrics.sensor("task-destruction");
+            this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
new file mode 100644
index 0000000..d8a012a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+/**
+ * TimestampTracker is a helper class for a sliding window implementation.
+ * It is assumed that stamped elements are added or removed in a FIFO manner.
+ * It maintains the timestamp, such as the min timestamp, the max timestamp, etc.
+ * of stamped elements that were added but not yet removed.
+ */
+public interface TimestampTracker<E> {
+
+    static final long NOT_KNOWN = -1L;
+
+    /**
+     * Adds a stamped elements to this tracker.
+     *
+     * @param elem the added element
+     */
+    void addElement(Stamped<E> elem);
+
+    /**
+     * Removed a stamped elements to this tracker.
+     *
+     * @param elem the removed element
+     */
+    void removeElement(Stamped<E> elem);
+
+    /**
+     * Returns the current tracked timestamp
+     *
+     * @return timestamp, or {@link #NOT_KNOWN} when empty
+     */
+    long get();
+
+    /**
+     * Returns the size of internal structure. The meaning of "size" depends on the implementation.
+     *
+     * @return size
+     */
+    int size();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
new file mode 100644
index 0000000..183b691
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+public class Entry<K, V> {
+
+    private final K key;
+    private final V value;
+
+    public Entry(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public K key() {
+        return key;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public String toString() {
+        return "Entry(" + key() + ", " + value() + ")";
+    }
+
+}


Mime
View raw message