kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [2/2] kafka git commit: KAFKA-2373: Add Kafka-backed offset storage for Copycat.
Date Fri, 25 Sep 2015 02:01:22 GMT
KAFKA-2373: Add Kafka-backed offset storage for Copycat.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira, James Cheng

Closes #202 from ewencp/kafka-2373-copycat-distributed-offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48b4d693
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48b4d693
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48b4d693

Branch: refs/heads/trunk
Commit: 48b4d6938dfb9dc72c941e979ff0d0c4d921f743
Parents: bcf374d
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Thu Sep 24 19:01:11 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Sep 24 19:01:11 2015 -0700

----------------------------------------------------------------------
 bin/copycat-distributed.sh                      |  23 +
 checkstyle/import-control.xml                   |  12 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   5 +-
 .../kafka/clients/consumer/MockConsumer.java    | 191 ++++++--
 .../consumer/internals/SubscriptionState.java   |   5 +-
 .../clients/consumer/MockConsumerTest.java      |  12 +-
 config/copycat-distributed.properties           |  39 ++
 .../kafka/copycat/cli/CopycatDistributed.java   |  87 ++++
 .../kafka/copycat/cli/CopycatStandalone.java    |   6 +-
 .../apache/kafka/copycat/runtime/Worker.java    |  14 +-
 .../copycat/storage/FileOffsetBackingStore.java |  29 +-
 .../storage/KafkaOffsetBackingStore.java        | 393 ++++++++++++++++
 .../storage/MemoryOffsetBackingStore.java       |  18 +-
 .../copycat/storage/OffsetBackingStore.java     |  10 +-
 .../storage/OffsetStorageReaderImpl.java        |   4 +-
 .../copycat/storage/OffsetStorageWriter.java    |   5 +-
 .../copycat/util/ConvertingFutureCallback.java  |  84 ++++
 .../kafka/copycat/util/FutureCallback.java      |  52 +--
 .../kafka/copycat/runtime/WorkerTest.java       |  36 +-
 .../storage/FileOffsetBackingStoreTest.java     |   8 +-
 .../storage/KafkaOffsetBackingStoreTest.java    | 458 +++++++++++++++++++
 .../storage/OffsetStorageWriterTest.java        |   8 +-
 .../util/ByteArrayProducerRecordEquals.java     |  53 +++
 .../apache/kafka/copycat/util/TestFuture.java   |  81 ++++
 .../runtime/src/test/resources/log4j.properties |  23 +
 tests/kafkatest/services/copycat.py             | 110 +++--
 .../kafkatest/tests/copycat_distributed_test.py |  84 ++++
 tests/kafkatest/tests/copycat_test.py           |   2 +-
 .../templates/copycat-distributed.properties    |  32 ++
 29 files changed, 1685 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/bin/copycat-distributed.sh
----------------------------------------------------------------------
diff --git a/bin/copycat-distributed.sh b/bin/copycat-distributed.sh
new file mode 100755
index 0000000..4d62300
--- /dev/null
+++ b/bin/copycat-distributed.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.CopycatDistributed "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f24977b..d58c472 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -27,6 +27,9 @@
 	<allow pkg="javax.management" />
 	<allow pkg="org.slf4j" />
 	<allow pkg="org.junit" />
+	<allow pkg="org.easymock" />
+	<allow pkg="org.powermock" />
+
 	<allow pkg="javax.net.ssl" />
 
 	<!-- no one depends on the server -->
@@ -124,6 +127,7 @@
 		<allow pkg="org.apache.kafka.common" />
 		<allow pkg="org.apache.kafka.copycat.data" />
 		<allow pkg="org.apache.kafka.copycat.errors" />
+		<allow pkg="org.apache.kafka.clients" />
 
 		<subpackage name="source">
 			<allow pkg="org.apache.kafka.copycat.connector" />
@@ -138,14 +142,11 @@
 
 		<subpackage name="runtime">
 			<allow pkg="org.apache.kafka.copycat" />
-			<allow pkg="org.apache.kafka.clients" />
-			<!-- for tests -->
-			<allow pkg="org.easymock" />
-			<allow pkg="org.powermock" />
 		</subpackage>
 
 		<subpackage name="cli">
 			<allow pkg="org.apache.kafka.copycat.runtime" />
+			<allow pkg="org.apache.kafka.copycat.storage" />
 			<allow pkg="org.apache.kafka.copycat.util" />
 			<allow pkg="org.apache.kafka.common" />
 		</subpackage>
@@ -153,9 +154,6 @@
 		<subpackage name="storage">
 			<allow pkg="org.apache.kafka.copycat" />
 			<allow pkg="org.apache.kafka.common.serialization" />
-			<!-- for tests -->
-			<allow pkg="org.easymock" />
-			<allow pkg="org.powermock" />
 		</subpackage>
 
 		<subpackage name="util">

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d2dcbe3..3009f6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1023,10 +1023,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Long offset = this.subscriptions.consumed(partition);
             if (offset == null) {
                 updateFetchPositions(Collections.singleton(partition));
-                return this.subscriptions.consumed(partition);
-            } else {
-                return offset;
+                offset = this.subscriptions.consumed(partition);
             }
+            return offset;
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index e7e3618..1f802a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -14,26 +14,32 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.regex.Pattern;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
- * threadsafe </i>
- * <p>
- * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to
- * communicate with. Failure to close the consumer after use will leak these resources.
+ * threadsafe </i>. However, you can use the {@link #waitForPollThen(Runnable,long)} method to write multithreaded tests
+ * where a driver thread waits for {@link #poll(long)} to be called and then can safely perform operations during a
+ * callback.
  */
 public class MockConsumer<K, V> implements Consumer<K, V> {
 
@@ -41,26 +47,45 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final SubscriptionState subscriptions;
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     private boolean closed;
+    private final Map<TopicPartition, Long> beginningOffsets;
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private AtomicReference<CountDownLatch> pollLatch;
+    private KafkaException exception;
+
+    private AtomicBoolean wakeup;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
         this.subscriptions = new SubscriptionState(offsetResetStrategy);
         this.partitions = new HashMap<String, List<PartitionInfo>>();
         this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
         this.closed = false;
+        this.beginningOffsets = new HashMap<>();
+        this.endOffsets = new HashMap<>();
+        this.pollLatch = new AtomicReference<>();
+        this.exception = null;
+        this.wakeup = new AtomicBoolean(false);
     }
     
     @Override
-    public synchronized Set<TopicPartition> assignment() {
+    public Set<TopicPartition> assignment() {
         return this.subscriptions.assignedPartitions();
     }
 
+    /** Simulate a rebalance event. */
+    public void rebalance(Collection<TopicPartition> newAssignment) {
+        // TODO: Rebalance callbacks
+        this.records.clear();
+        this.subscriptions.changePartitionAssignment(newAssignment);
+    }
+
     @Override
-    public synchronized Set<String> subscription() {
+    public Set<String> subscription() {
         return this.subscriptions.subscription();
     }
 
     @Override
-    public synchronized void subscribe(List<String> topics) {
+    public void subscribe(List<String> topics) {
         subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
@@ -79,13 +104,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
+    public void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(topics, listener);
     }
 
     @Override
-    public synchronized void assign(List<TopicPartition> partitions) {
+    public void assign(List<TopicPartition> partitions) {
         ensureNotClosed();
         this.subscriptions.assign(partitions);
     }
@@ -97,14 +122,39 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized ConsumerRecords<K, V> poll(long timeout) {
+    public ConsumerRecords<K, V> poll(long timeout) {
         ensureNotClosed();
+
+        CountDownLatch pollLatchCopy = pollLatch.get();
+        if (pollLatchCopy != null) {
+            pollLatch.set(null);
+            pollLatchCopy.countDown();
+            synchronized (pollLatchCopy) {
+                // Will block until caller of waitUntilPollThen() finishes their callback.
+            }
+        }
+
+        if (wakeup.get()) {
+            wakeup.set(false);
+            throw new ConsumerWakeupException();
+        }
+
+        if (exception != null) {
+            RuntimeException exception = this.exception;
+            this.exception = null;
+            throw exception;
+        }
+
+        // Handle seeks that need to wait for a poll() call to be processed
+        for (TopicPartition tp : subscriptions.missingFetchPositions())
+            updateFetchPosition(tp);
+
         // update the consumed offset
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
             if (!subscriptions.isPaused(entry.getKey())) {
                 List<ConsumerRecord<K, V>> recs = entry.getValue();
                 if (!recs.isEmpty())
-                    this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
+                    this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset() + 1);
             }
         }
 
@@ -113,15 +163,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         return copy;
     }
 
-    public synchronized void addRecord(ConsumerRecord<K, V> record) {
+    public void addRecord(ConsumerRecord<K, V> record) {
         ensureNotClosed();
         TopicPartition tp = new TopicPartition(record.topic(), record.partition());
-        ArrayList<TopicPartition> currentAssigned = new ArrayList<>(this.subscriptions.assignedPartitions());
-        if (!currentAssigned.contains(tp)) {
-            currentAssigned.add(tp);
-            this.subscriptions.changePartitionAssignment(currentAssigned);
-        }
-        subscriptions.seek(tp, record.offset());
+        Set<TopicPartition> currentAssigned = new HashSet<>(this.subscriptions.assignedPartitions());
+        if (!currentAssigned.contains(tp))
+            throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
         List<ConsumerRecord<K, V>> recs = this.records.get(tp);
         if (recs == null) {
             recs = new ArrayList<ConsumerRecord<K, V>>();
@@ -130,10 +177,14 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         recs.add(record);
     }
 
+    public void setException(KafkaException exception) {
+        this.exception = exception;
+    }
+
     @Override
-    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
         ensureNotClosed();
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet())
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet())
             subscriptions.committed(entry.getKey(), entry.getValue());
         if (callback != null) {
             callback.onComplete(offsets, null);
@@ -141,54 +192,71 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         commitAsync(offsets, null);
     }
 
     @Override
-    public synchronized void commitAsync() {
+    public void commitAsync() {
         commitAsync(null);
     }
 
     @Override
-    public synchronized void commitAsync(OffsetCommitCallback callback) {
+    public void commitAsync(OffsetCommitCallback callback) {
         ensureNotClosed();
         commitAsync(this.subscriptions.allConsumed(), callback);
     }
 
     @Override
-    public synchronized void commitSync() {
+    public void commitSync() {
         commitSync(this.subscriptions.allConsumed());
     }
 
     @Override
-    public synchronized void seek(TopicPartition partition, long offset) {
+    public void seek(TopicPartition partition, long offset) {
         ensureNotClosed();
         subscriptions.seek(partition, offset);
     }
 
     @Override
-    public synchronized OffsetAndMetadata committed(TopicPartition partition) {
+    public OffsetAndMetadata committed(TopicPartition partition) {
         ensureNotClosed();
         return subscriptions.committed(partition);
     }
 
     @Override
-    public synchronized long position(TopicPartition partition) {
+    public long position(TopicPartition partition) {
         ensureNotClosed();
-        return subscriptions.consumed(partition);
+        if (!this.subscriptions.isAssigned(partition))
+            throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+        Long offset = this.subscriptions.consumed(partition);
+        if (offset == null) {
+            updateFetchPosition(partition);
+            offset = this.subscriptions.consumed(partition);
+        }
+        return offset;
     }
 
     @Override
-    public synchronized void seekToBeginning(TopicPartition... partitions) {
+    public void seekToBeginning(TopicPartition... partitions) {
         ensureNotClosed();
-        throw new UnsupportedOperationException();
+        for (TopicPartition tp : partitions)
+            subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+    }
+
+    public void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
+        beginningOffsets.putAll(newOffsets);
     }
 
     @Override
-    public synchronized void seekToEnd(TopicPartition... partitions) {
+    public void seekToEnd(TopicPartition... partitions) {
         ensureNotClosed();
-        throw new UnsupportedOperationException();
+        for (TopicPartition tp : partitions)
+            subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+    }
+
+    public void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
+        endOffsets.putAll(newOffsets);
     }
 
     @Override
@@ -198,7 +266,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized List<PartitionInfo> partitionsFor(String topic) {
+    public List<PartitionInfo> partitionsFor(String topic) {
         ensureNotClosed();
         List<PartitionInfo> parts = this.partitions.get(topic);
         if (parts == null)
@@ -213,7 +281,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         return partitions;
     }
 
-    public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
+    public void updatePartitions(String topic, List<PartitionInfo> partitions) {
         ensureNotClosed();
         this.partitions.put(topic, partitions);
     }
@@ -231,18 +299,69 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public synchronized void close() {
+    public void close() {
         ensureNotClosed();
         this.closed = true;
     }
 
+    public boolean closed() {
+        return this.closed;
+    }
+
     @Override
     public void wakeup() {
+        wakeup.set(true);
+    }
 
+    public void waitForPoll(long timeoutMs) {
+        waitForPollThen(null, timeoutMs);
+    }
+
+    public void waitForPollThen(Runnable task, long timeoutMs) {
+        CountDownLatch latch = new CountDownLatch(1);
+        synchronized (latch) {
+            pollLatch.set(latch);
+            try {
+                if (!latch.await(timeoutMs, TimeUnit.MILLISECONDS))
+                    throw new TimeoutException("Timed out waiting for consumer thread to call poll().");
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("MockConsumer waiting thread was interrupted.", e);
+            }
+            if (task != null)
+                task.run();
+        }
     }
 
     private void ensureNotClosed() {
         if (this.closed)
             throw new IllegalStateException("This consumer has already been closed.");
     }
+
+    private void updateFetchPosition(TopicPartition tp) {
+        if (subscriptions.isOffsetResetNeeded(tp)) {
+            resetOffsetPosition(tp);
+        } else if (subscriptions.committed(tp) == null) {
+            subscriptions.needOffsetReset(tp);
+            resetOffsetPosition(tp);
+        } else {
+            subscriptions.seek(tp, subscriptions.committed(tp).offset());
+        }
+    }
+
+    private void resetOffsetPosition(TopicPartition tp) {
+        OffsetResetStrategy strategy = subscriptions.resetStrategy(tp);
+        Long offset;
+        if (strategy == OffsetResetStrategy.EARLIEST) {
+            offset = beginningOffsets.get(tp);
+            if (offset == null)
+                throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
+        } else if (strategy == OffsetResetStrategy.LATEST) {
+            offset = endOffsets.get(tp);
+            if (offset == null)
+                throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
+        } else {
+            throw new NoOffsetForPartitionException("No offset available");
+        }
+        seek(tp, offset);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 9b610d8..25a0e90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -17,6 +17,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -264,7 +265,7 @@ public class SubscriptionState {
     }
 
     public Set<TopicPartition> missingFetchPositions() {
-        Set<TopicPartition> missing = new HashSet<>(this.assignment.keySet());
+        Set<TopicPartition> missing = new HashSet<>();
         for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
             if (!entry.getValue().hasValidPosition)
                 missing.add(entry.getKey());
@@ -275,7 +276,7 @@ public class SubscriptionState {
         return this.needsPartitionAssignment;
     }
 
-    public void changePartitionAssignment(List<TopicPartition> assignments) {
+    public void changePartitionAssignment(Collection<TopicPartition> assignments) {
         for (TopicPartition tp : assignments)
             if (!this.subscription.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index f702535..fa06be9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 
 import static org.junit.Assert.assertEquals;
@@ -34,6 +35,13 @@ public class MockConsumerTest {
     public void testSimpleMock() {
         consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener());
         assertEquals(0, consumer.poll(1000).count());
+        consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
+        // Mock consumers need to seek manually since they cannot automatically reset offsets
+        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(new TopicPartition("test", 0), 0L);
+        beginningOffsets.put(new TopicPartition("test", 1), 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+        consumer.seek(new TopicPartition("test", 0), 0);
         ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
         ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
         consumer.addRecord(rec1);
@@ -43,9 +51,9 @@ public class MockConsumerTest {
         assertEquals(rec1, iter.next());
         assertEquals(rec2, iter.next());
         assertFalse(iter.hasNext());
-        assertEquals(1L, consumer.position(new TopicPartition("test", 0)));
+        assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
         consumer.commitSync();
-        assertEquals(1L, consumer.committed(new TopicPartition("test", 0)).offset());
+        assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties
new file mode 100644
index 0000000..654ed24
--- /dev/null
+++ b/config/copycat-distributed.properties
@@ -0,0 +1,39 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+# These are defaults. This file just demonstrates how to override some settings.
+bootstrap.servers=localhost:9092
+
+# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
+# need to configure these based on the format they want their data in when loaded from or stored into Kafka
+key.converter=org.apache.kafka.copycat.json.JsonConverter
+value.converter=org.apache.kafka.copycat.json.JsonConverter
+# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
+# it to
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+
+# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
+# Offset data is never visible outside of Copcyat.
+offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.key.converter.schemas.enable=false
+offset.value.converter.schemas.enable=false
+
+offset.storage.topic=copycat-offsets
+# Flush much faster than normal, which is useful for testing/debugging
+offset.flush.interval.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
new file mode 100644
index 0000000..b5e8896
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.cli;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.runtime.Copycat;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * <p>
+ * Command line utility that runs Copycat in distributed mode. In this mode, the process joints a group of other workers
+ * and work is distributed among them. This is useful for running Copycat as a service, where connectors can be
+ * submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to
+ * easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker
+ * instances.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class CopycatDistributed {
+    private static final Logger log = LoggerFactory.getLogger(CopycatDistributed.class);
+
+    public static void main(String[] args) throws Exception {
+        Properties workerProps;
+        Properties connectorProps;
+
+        if (args.length < 2) {
+            log.info("Usage: CopycatDistributed worker.properties connector1.properties [connector2.properties ...]");
+            System.exit(1);
+        }
+
+        String workerPropsFile = args[0];
+        workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
+
+        WorkerConfig workerConfig = new WorkerConfig(workerProps);
+        Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
+        Herder herder = new StandaloneHerder(worker);
+        final Copycat copycat = new Copycat(worker, herder);
+        copycat.start();
+
+        try {
+            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
+                connectorProps = Utils.loadProps(connectorPropsFile);
+                FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
+                    @Override
+                    public void onCompletion(Throwable error, String id) {
+                        if (error != null)
+                            log.error("Failed to create job for {}", connectorPropsFile);
+                    }
+                });
+                herder.addConnector(connectorProps, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            copycat.stop();
+        }
+
+        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+        copycat.awaitStop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
index 130a529..12ec154 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -19,9 +19,11 @@ package org.apache.kafka.copycat.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.runtime.Copycat;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.copycat.storage.FileOffsetBackingStore;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.FutureCallback;
 import org.slf4j.Logger;
@@ -58,9 +60,9 @@ public class CopycatStandalone {
         workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
 
         WorkerConfig workerConfig = new WorkerConfig(workerProps);
-        Worker worker = new Worker(workerConfig);
+        Worker worker = new Worker(workerConfig, new FileOffsetBackingStore());
         Herder herder = new StandaloneHerder(worker);
-        final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, herder);
+        final Copycat copycat = new Copycat(worker, herder);
         copycat.start();
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 6cbce0b..a34a014 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -59,8 +59,8 @@ public class Worker {
     private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
-    public Worker(WorkerConfig config) {
-        this(new SystemTime(), config, null);
+    public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+        this(new SystemTime(), config, offsetBackingStore);
     }
 
     @SuppressWarnings("unchecked")
@@ -76,12 +76,8 @@ public class Worker {
         this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
         this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false);
 
-        if (offsetBackingStore != null) {
-            this.offsetBackingStore = offsetBackingStore;
-        } else {
-            this.offsetBackingStore = new FileOffsetBackingStore();
-            this.offsetBackingStore.configure(config.originals());
-        }
+        this.offsetBackingStore = offsetBackingStore;
+        this.offsetBackingStore.configure(config.originals());
     }
 
     public void start() {
@@ -132,7 +128,7 @@ public class Worker {
         long timeoutMs = limit - time.milliseconds();
         sourceTaskOffsetCommitter.close(timeoutMs);
 
-        offsetBackingStore.start();
+        offsetBackingStore.stop();
 
         log.info("Worker stopped");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
index dfa9e78..f707fd6 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
@@ -68,17 +68,12 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
             Object obj = is.readObject();
             if (!(obj instanceof HashMap))
                 throw new CopycatException("Expected HashMap but found " + obj.getClass());
-            HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
+            Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
             data = new HashMap<>();
-            for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
-                HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
-                for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) {
-                    ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
-                    ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) :
-                            null;
-                    converted.put(key, value);
-                }
-                data.put(entry.getKey(), converted);
+            for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
+                ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
+                ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
+                data.put(key, value);
             }
             is.close();
         } catch (FileNotFoundException | EOFException e) {
@@ -92,15 +87,11 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
     protected void save() {
         try {
             ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file));
-            HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>();
-            for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) {
-                HashMap<byte[], byte[]> converted = new HashMap<>();
-                for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) {
-                    byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
-                    byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
-                    converted.put(key, value);
-                }
-                raw.put(entry.getKey(), converted);
+            Map<byte[], byte[]> raw = new HashMap<>();
+            for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
+                byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
+                byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
+                raw.put(key, value);
             }
             os.writeObject(raw);
             os.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
new file mode 100644
index 0000000..a74b39c
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConvertingFutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ *     Implementation of OffsetBackingStore that uses a Kafka topic to store offset data.
+ * </p>
+ * <p>
+ *     Internally, this implementation both produces to and consumes from a Kafka topic which stores the offsets.
+ *     It accepts producer and consumer overrides via its configuration but forces some settings to specific values
+ *     to ensure correct behavior (e.g. acks, auto.offset.reset).
+ * </p>
+ */
+public class KafkaOffsetBackingStore implements OffsetBackingStore {
+    private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
+
+    public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
+
+    private final static long CREATE_TOPIC_TIMEOUT_MS = 30000;
+
+    private Time time;
+    private Map<String, ?> configs;
+    private String topic;
+    private Consumer<byte[], byte[]> consumer;
+    private Producer<byte[], byte[]> producer;
+    private HashMap<ByteBuffer, ByteBuffer> data;
+
+    private Thread thread;
+    private boolean stopRequested;
+    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
+
+    public KafkaOffsetBackingStore() {
+        this(new SystemTime());
+    }
+
+    public KafkaOffsetBackingStore(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.configs = configs;
+        topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
+        if (topic == null)
+            throw new CopycatException("Offset storage topic must be specified");
+
+        data = new HashMap<>();
+        stopRequested = false;
+        readLogEndOffsetCallbacks = new ArrayDeque<>();
+    }
+
+    @Override
+    public void start() {
+        log.info("Starting KafkaOffsetBackingStore with topic " + topic);
+
+        producer = createProducer();
+        consumer = createConsumer();
+        List<TopicPartition> partitions = new ArrayList<>();
+
+        // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
+        // we rely on topic auto-creation
+        List<PartitionInfo> partitionInfos = null;
+        long started = time.milliseconds();
+        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+            partitionInfos = consumer.partitionsFor(topic);
+            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
+        }
+        if (partitionInfos == null)
+            throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
+                    " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
+                    " this is your first use of the topic it may have taken too long to create.");
+
+        for (PartitionInfo partition : partitionInfos)
+            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        consumer.assign(partitions);
+
+        readToLogEnd();
+
+        thread = new WorkThread();
+        thread.start();
+
+        log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
+    }
+
+    @Override
+    public void stop() {
+        log.info("Stopping KafkaOffsetBackingStore");
+
+        synchronized (this) {
+            stopRequested = true;
+            consumer.wakeup();
+        }
+
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            throw new CopycatException("Failed to stop KafkaOffsetBackingStore. Exiting without cleanly shutting " +
+                    "down it's producer and consumer.", e);
+        }
+
+        try {
+            producer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to close KafkaOffsetBackingStore producer", e);
+        }
+
+        try {
+            consumer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to close KafkaOffsetBackingStore consumer", e);
+        }
+    }
+
+    @Override
+    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys,
+                                                   final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
+            @Override
+            public Map<ByteBuffer, ByteBuffer> convert(Void result) {
+                Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
+                synchronized (KafkaOffsetBackingStore.this) {
+                    for (ByteBuffer key : keys)
+                        values.put(key, data.get(key));
+                }
+                return values;
+            }
+        };
+        readLogEndOffsetCallbacks.add(future);
+        consumer.wakeup();
+        return future;
+    }
+
+    @Override
+    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
+        SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
+
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
+            producer.send(new ProducerRecord<>(topic, entry.getKey().array(), entry.getValue().array()), producerCallback);
+        }
+
+        return producerCallback;
+    }
+
+
+
+    private Producer<byte[], byte[]> createProducer() {
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return new KafkaProducer<>(producerProps);
+    }
+
+    private Consumer<byte[], byte[]> createConsumer() {
+        Map<String, Object> consumerConfig = new HashMap<>();
+        consumerConfig.putAll(configs);
+        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        return new KafkaConsumer<>(consumerConfig);
+    }
+
+    private void poll(long timeoutMs) {
+        try {
+            ConsumerRecords<byte[], byte[]> records = consumer.poll(timeoutMs);
+            for (ConsumerRecord record : records) {
+                ByteBuffer key = record.key() != null ? ByteBuffer.wrap((byte[]) record.key()) : null;
+                ByteBuffer value = record.value() != null ? ByteBuffer.wrap((byte[]) record.value()) : null;
+                data.put(key, value);
+            }
+        } catch (ConsumerWakeupException e) {
+            // Expected on get() or stop(). The calling code should handle this
+            throw e;
+        } catch (KafkaException e) {
+            log.error("Error polling: " + e);
+        }
+    }
+
+    private void readToLogEnd() {
+        log.trace("Reading to end of offset log");
+
+        Set<TopicPartition> assignment = consumer.assignment();
+
+        // This approach to getting the current end offset is hacky until we have an API for looking these up directly
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        for (TopicPartition tp : assignment) {
+            long offset = consumer.position(tp);
+            offsets.put(tp, offset);
+            consumer.seekToEnd(tp);
+        }
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        try {
+            poll(0);
+        } finally {
+            // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
+            // the consumers position is reset or it'll get into an inconsistent state.
+            for (TopicPartition tp : assignment) {
+                long startOffset = offsets.get(tp);
+                long endOffset = consumer.position(tp);
+                if (endOffset > startOffset) {
+                    endOffsets.put(tp, endOffset);
+                    consumer.seek(tp, startOffset);
+                }
+                log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
+            }
+        }
+
+        while (!endOffsets.isEmpty()) {
+            poll(Integer.MAX_VALUE);
+
+            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<TopicPartition, Long> entry = it.next();
+                if (consumer.position(entry.getKey()) >= entry.getValue())
+                    it.remove();
+                else
+                    break;
+            }
+        }
+    }
+
+
+    private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
+        private int numLeft;
+        private boolean completed = false;
+        private Throwable exception = null;
+        private final Callback<Void> callback;
+
+        public SetCallbackFuture(int numRecords, Callback<Void> callback) {
+            numLeft = numRecords;
+            this.callback = callback;
+        }
+
+        @Override
+        public synchronized void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception != null) {
+                if (!completed) {
+                    this.exception = exception;
+                    callback.onCompletion(exception, null);
+                    completed = true;
+                    this.notify();
+                }
+                return;
+            }
+
+            numLeft -= 1;
+            if (numLeft == 0) {
+                callback.onCompletion(null, null);
+                completed = true;
+                this.notify();
+            }
+        }
+
+        @Override
+        public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public synchronized boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public synchronized boolean isDone() {
+            return completed;
+        }
+
+        @Override
+        public synchronized Void get() throws InterruptedException, ExecutionException {
+            while (!completed) {
+                this.wait();
+            }
+            if (exception != null)
+                throw new ExecutionException(exception);
+            return null;
+        }
+
+        @Override
+        public synchronized Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            long started = System.currentTimeMillis();
+            long limit = started + unit.toMillis(timeout);
+            while (!completed) {
+                long leftMs = limit - System.currentTimeMillis();
+                if (leftMs < 0)
+                    throw new TimeoutException("KafkaOffsetBackingStore Future timed out.");
+                this.wait(leftMs);
+            }
+            if (exception != null)
+                throw new ExecutionException(exception);
+            return null;
+        }
+    }
+
+    private class WorkThread extends Thread {
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    int numCallbacks;
+                    synchronized (KafkaOffsetBackingStore.this) {
+                        if (stopRequested)
+                            break;
+                        numCallbacks = readLogEndOffsetCallbacks.size();
+                    }
+
+                    if (numCallbacks > 0) {
+                        try {
+                            readToLogEnd();
+                        } catch (ConsumerWakeupException e) {
+                            // Either received another get() call and need to retry reading to end of log or stop() was
+                            // called. Both are handled by restarting this loop.
+                            continue;
+                        }
+                    }
+
+                    synchronized (KafkaOffsetBackingStore.this) {
+                        for (int i = 0; i < numCallbacks; i++) {
+                            Callback<Void> cb = readLogEndOffsetCallbacks.poll();
+                            cb.onCompletion(null, null);
+                        }
+                    }
+
+                    try {
+                        poll(Integer.MAX_VALUE);
+                    } catch (ConsumerWakeupException e) {
+                        // See previous comment, both possible causes of this wakeup are handled by starting this loop again
+                        continue;
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("Unexpected exception in KafkaOffsetBackingStore's work thread", t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
index 6ffba58..11a1b89 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
@@ -38,7 +38,7 @@ import java.util.concurrent.Future;
 public class MemoryOffsetBackingStore implements OffsetBackingStore {
     private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
 
-    protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<>();
+    protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
     protected ExecutorService executor = Executors.newSingleThreadExecutor();
 
     public MemoryOffsetBackingStore() {
@@ -60,18 +60,15 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
 
     @Override
     public Future<Map<ByteBuffer, ByteBuffer>> get(
-            final String namespace, final Collection<ByteBuffer> keys,
+            final Collection<ByteBuffer> keys,
             final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
         return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public Map<ByteBuffer, ByteBuffer> call() throws Exception {
                 Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
                 synchronized (MemoryOffsetBackingStore.this) {
-                    Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
-                    if (namespaceData == null)
-                        return result;
                     for (ByteBuffer key : keys) {
-                        result.put(key, namespaceData.get(key));
+                        result.put(key, data.get(key));
                     }
                 }
                 if (callback != null)
@@ -83,19 +80,14 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
     }
 
     @Override
-    public Future<Void> set(final String namespace, final Map<ByteBuffer, ByteBuffer> values,
+    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values,
                             final Callback<Void> callback) {
         return executor.submit(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 synchronized (MemoryOffsetBackingStore.this) {
-                    Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
-                    if (namespaceData == null) {
-                        namespaceData = new HashMap<>();
-                        data.put(namespace, namespaceData);
-                    }
                     for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
-                        namespaceData.put(entry.getKey(), entry.getValue());
+                        data.put(entry.getKey(), entry.getValue());
                     }
                     save();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
index e8cb2ae..239d9a8 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
@@ -34,8 +34,8 @@ import java.util.concurrent.Future;
  * </p>
  * <p>
  * Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances
- * that are associated with individual tasks, all operations include a namespace which should be
- * used to isolate different key spaces.
+ * that are associated with individual tasks, the caller must be sure keys include information about the
+ * connector so that the shared namespace does not result in conflicting keys.
  * </p>
  */
 public interface OffsetBackingStore extends Configurable {
@@ -53,22 +53,20 @@ public interface OffsetBackingStore extends Configurable {
 
     /**
      * Get the values for the specified keys
-     * @param namespace prefix for the keys in this request
      * @param keys list of keys to look up
      * @param callback callback to invoke on completion
      * @return future for the resulting map from key to value
      */
     public Future<Map<ByteBuffer, ByteBuffer>> get(
-            String namespace, Collection<ByteBuffer> keys,
+            Collection<ByteBuffer> keys,
             Callback<Map<ByteBuffer, ByteBuffer>> callback);
 
     /**
      * Set the specified keys and values.
-     * @param namespace prefix for the keys in this request
      * @param values map from key to value
      * @param callback callback to invoke on completion
      * @return void future for the operation
      */
-    public Future<Void> set(String namespace, Map<ByteBuffer, ByteBuffer> values,
+    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
                             Callback<Void> callback);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index 7521955..dbb3d0d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -62,7 +62,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
             try {
                 // Offsets are treated as schemaless, their format is only validated here (and the returned value below)
                 OffsetUtils.validateFormat(key);
-                byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, key);
+                byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, Arrays.asList(namespace, key));
                 ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
                 serializedToOriginal.put(keyBuffer, key);
             } catch (Throwable t) {
@@ -75,7 +75,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
         // Get serialized key -> serialized value from backing store
         Map<ByteBuffer, ByteBuffer> raw;
         try {
-            raw = backingStore.get(namespace, serializedToOriginal.keySet(), null).get();
+            raw = backingStore.get(serializedToOriginal.keySet(), null).get();
         } catch (Exception e) {
             log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
             throw new CopycatException("Failed to fetch offsets.", e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
index be8c718..59c12a7 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
@@ -139,7 +139,8 @@ public class OffsetStorageWriter {
                 // for that data. The only enforcement of the format is here.
                 OffsetUtils.validateFormat(entry.getKey());
                 OffsetUtils.validateFormat(entry.getValue());
-                byte[] key = keyConverter.fromCopycatData(namespace, null, entry.getKey());
+                // When serializing the key, we add in the namespace information so the key is [namespace, real key]
+                byte[] key = keyConverter.fromCopycatData(namespace, null, Arrays.asList(namespace, entry.getKey()));
                 ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
                 byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue());
                 ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
@@ -158,7 +159,7 @@ public class OffsetStorageWriter {
 
         // And submit the data
         log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
-        return backingStore.set(namespace, offsetsSerialized, new Callback<Void>() {
+        return backingStore.set(offsetsSerialized, new Callback<Void>() {
             @Override
             public void onCompletion(Throwable error, Void result) {
                 boolean isCurrent = handleFinishWrite(flushId, error, result);

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
new file mode 100644
index 0000000..6bf3885
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> {
+
+    private Callback<T> underlying;
+    private CountDownLatch finishedLatch;
+    private T result = null;
+    private Throwable exception = null;
+
+    public ConvertingFutureCallback(Callback<T> underlying) {
+        this.underlying = underlying;
+        this.finishedLatch = new CountDownLatch(1);
+    }
+
+    public abstract T convert(U result);
+
+    @Override
+    public void onCompletion(Throwable error, U result) {
+        this.exception = error;
+        this.result = convert(result);
+        if (underlying != null)
+            underlying.onCompletion(error, this.result);
+        finishedLatch.countDown();
+    }
+
+    @Override
+    public boolean cancel(boolean b) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return finishedLatch.getCount() == 0;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        finishedLatch.await();
+        return result();
+    }
+
+    @Override
+    public T get(long l, TimeUnit timeUnit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        finishedLatch.await(l, timeUnit);
+        return result();
+    }
+
+    private T result() throws ExecutionException {
+        if (exception != null) {
+            throw new ExecutionException(exception);
+        }
+        return result;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
index db9f2c4..61e04b6 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
@@ -17,60 +17,14 @@
 
 package org.apache.kafka.copycat.util;
 
-import java.util.concurrent.*;
-
-public class FutureCallback<T> implements Callback<T>, Future<T> {
-
-    private Callback<T> underlying;
-    private CountDownLatch finishedLatch;
-    private T result = null;
-    private Throwable exception = null;
+public class FutureCallback<T> extends ConvertingFutureCallback<T, T> {
 
     public FutureCallback(Callback<T> underlying) {
-        this.underlying = underlying;
-        this.finishedLatch = new CountDownLatch(1);
-    }
-
-    @Override
-    public void onCompletion(Throwable error, T result) {
-        underlying.onCompletion(error, result);
-        this.exception = error;
-        this.result = result;
-        finishedLatch.countDown();
+        super(underlying);
     }
 
     @Override
-    public boolean cancel(boolean b) {
-        return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-        return false;
-    }
-
-    @Override
-    public boolean isDone() {
-        return finishedLatch.getCount() == 0;
-    }
-
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-        finishedLatch.await();
-        return result();
-    }
-
-    @Override
-    public T get(long l, TimeUnit timeUnit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-        finishedLatch.await(l, timeUnit);
-        return result();
-    }
-
-    private T result() throws ExecutionException {
-        if (exception != null) {
-            throw new ExecutionException(exception);
-        }
+    public T convert(T result) {
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 701e230..e75d2f9 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -37,6 +37,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 @RunWith(PowerMockRunner.class)
@@ -45,6 +46,7 @@ import java.util.Properties;
 public class WorkerTest extends ThreadedTest {
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private WorkerConfig config;
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
 
@@ -59,13 +61,16 @@ public class WorkerTest extends ThreadedTest {
         workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("offset.key.converter.schemas.enable", "false");
         workerProps.setProperty("offset.value.converter.schemas.enable", "false");
-        WorkerConfig config = new WorkerConfig(workerProps);
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
+        config = new WorkerConfig(workerProps);
     }
 
     @Test
     public void testAddRemoveTask() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
         ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
 
         // Create
@@ -96,8 +101,13 @@ public class WorkerTest extends ThreadedTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
         worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
         worker.stopTask(taskId);
         // Nothing should be left, so this should effectively be a nop
@@ -108,11 +118,26 @@ public class WorkerTest extends ThreadedTest {
 
     @Test(expected = CopycatException.class)
     public void testStopInvalidTask() {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
         worker.stopTask(taskId);
     }
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
@@ -142,8 +167,13 @@ public class WorkerTest extends ThreadedTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
         worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
         worker.stop();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
index bbcbdc9..2976c0a 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
@@ -67,9 +67,9 @@ public class FileOffsetBackingStoreTest {
         Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
         PowerMock.replayAll();
 
-        store.set("namespace", firstSet, setCallback).get();
+        store.set(firstSet, setCallback).get();
 
-        Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
+        Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
         assertEquals(buffer("value"), values.get(buffer("key")));
         assertEquals(null, values.get(buffer("bad")));
 
@@ -82,14 +82,14 @@ public class FileOffsetBackingStoreTest {
         Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
         PowerMock.replayAll();
 
-        store.set("namespace", firstSet, setCallback).get();
+        store.set(firstSet, setCallback).get();
         store.stop();
 
         // Restore into a new store to ensure correct reload from scratch
         FileOffsetBackingStore restore = new FileOffsetBackingStore();
         restore.configure(props);
         restore.start();
-        Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
+        Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
         assertEquals(buffer("value"), values.get(buffer("key")));
 
         PowerMock.verifyAll();


Mime
View raw message