kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of Copycat configs.
Date Tue, 13 Oct 2015 17:23:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e2ec02e1d -> 36d446932


http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
index 6a3eec3..69d9ab4 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
@@ -18,23 +18,16 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.TestFuture;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,70 +42,56 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.apache.kafka.copycat.util.ByteArrayProducerRecordEquals.eqProducerRecord;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaOffsetBackingStore.class)
 @PowerMockIgnore("javax.management.*")
 public class KafkaOffsetBackingStoreTest {
     private static final String TOPIC = "copycat-offsets";
-    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
-    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
     private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
     static {
         DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
         DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
     }
-    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1));
     private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
     static {
         FIRST_SET.put(buffer("key"), buffer("value"));
         FIRST_SET.put(null, null);
     }
 
-
-    private static final Node LEADER = new Node(1, "broker1", 9092);
-    private static final Node REPLICA = new Node(1, "broker2", 9093);
-
-    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
-    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
-
     private static final ByteBuffer TP0_KEY = buffer("TP0KEY");
     private static final ByteBuffer TP1_KEY = buffer("TP1KEY");
+    private static final ByteBuffer TP2_KEY = buffer("TP2KEY");
     private static final ByteBuffer TP0_VALUE = buffer("VAL0");
     private static final ByteBuffer TP1_VALUE = buffer("VAL1");
+    private static final ByteBuffer TP2_VALUE = buffer("VAL2");
     private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
     private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
 
+    @Mock
+    KafkaBasedLog<byte[], byte[]> storeLog;
     private KafkaOffsetBackingStore store;
 
-    @Mock private KafkaProducer<byte[], byte[]> producer;
-    private MockConsumer<byte[], byte[]> consumer;
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
 
     @Before
     public void setUp() throws Exception {
-        store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createConsumer", "createProducer"});
-        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
-        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
-        beginningOffsets.put(TP0, 0L);
-        beginningOffsets.put(TP1, 0L);
-        consumer.updateBeginningOffsets(beginningOffsets);
+        store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"});
     }
 
     @Test(expected = CopycatException.class)
@@ -123,119 +102,121 @@ public class KafkaOffsetBackingStoreTest {
 
     @Test
     public void testStartStop() throws Exception {
-        expectStart();
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
         expectStop();
 
         PowerMock.replayAll();
 
         store.configure(DEFAULT_PROPS);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
         store.start();
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectStart();
+        expectConfigure();
+        expectStart(Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()),
+                new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array())
+        ));
         expectStop();
 
         PowerMock.replayAll();
 
         store.configure(DEFAULT_PROPS);
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 1L);
-        endOffsets.put(TP1, 1L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Needs to seek to end to find end offsets
-                consumer.waitForPoll(10000);
-
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
-                    }
-                }, 10000);
-
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
-        assertEquals(TP0_VALUE, data.get(TP0_KEY));
-        assertEquals(TP1_VALUE, data.get(TP1_KEY));
+        assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
+        assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
     @Test
     public void testGetSet() throws Exception {
-        expectStart();
-        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY.array(), TP0_VALUE.array());
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+        expectStop();
+
+        // First get() against an empty store
+        final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                firstGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp0Record), EasyMock.capture(callback1))).andReturn(tp0Future);
-        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY.array(), TP1_VALUE.array());
-        Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp1Record), EasyMock.capture(callback2))).andReturn(tp1Future);
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
 
-        expectStop();
+        // Second get() should get the produced data and return the new values
+        final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
+                secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        // Third get() should pick up data produced by someone else and return those values
+        final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+                thirdGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
 
         PowerMock.replayAll();
 
+
+
         store.configure(DEFAULT_PROPS);
+        store.start();
 
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+        // Getting from empty store should return nulls
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
             @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
+            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
+                // Since we didn't read them yet, these will be null
+                assertEquals(null, result.get(TP0_KEY));
+                assertEquals(null, result.get(TP1_KEY));
+                getInvokedAndPassed.set(true);
             }
-        };
-        startConsumerOpsThread.start();
-        store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
 
+        // Set some offsets
         Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
         toSet.put(TP0_KEY, TP0_VALUE);
         toSet.put(TP1_KEY, TP1_VALUE);
@@ -247,208 +228,126 @@ public class KafkaOffsetBackingStoreTest {
             }
         });
         assertFalse(setFuture.isDone());
-        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing
-        assertFalse(setFuture.isDone());
-        tp0Future.resolve((RecordMetadata) null);
         // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
         // for the store's set callback
-        callback2.getValue().onCompletion(null, null);
-        assertFalse(invoked.get());
         callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback0.getValue().onCompletion(null, null);
         setFuture.get(10000, TimeUnit.MILLISECONDS);
         assertTrue(invoked.get());
 
-        // Getting data should continue to return old data...
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
         store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
-                // Since we didn't read them yet, these will be null
-                assertEquals(null, result.get(TP0_KEY));
-                assertEquals(null, result.get(TP1_KEY));
-                getInvokedAndPassed.set(true);
+                assertEquals(TP0_VALUE, result.get(TP0_KEY));
+                assertEquals(TP1_VALUE, result.get(TP1_KEY));
+                secondGetInvokedAndPassed.set(true);
             }
         }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(getInvokedAndPassed.get());
-
-        // Until the consumer gets the new data
-        Thread readNewDataThread = new Thread("read-new-data-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                    }
-                }, 10000);
-
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()));
-                    }
-                }, 10000);
-            }
-        };
-        readNewDataThread.start();
-        readNewDataThread.join(10000);
-        assertFalse(readNewDataThread.isAlive());
+        assertTrue(secondGetInvokedAndPassed.get());
 
-        // And now the new data should be returned
-        final AtomicBoolean finalGetInvokedAndPassed = new AtomicBoolean(false);
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false);
         store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
                 assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
                 assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
-                finalGetInvokedAndPassed.set(true);
+                thirdGetInvokedAndPassed.set(true);
             }
         }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(finalGetInvokedAndPassed.get());
+        assertTrue(thirdGetInvokedAndPassed.get());
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testConsumerError() throws Exception {
-        expectStart();
+    public void testSetFailure() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
         expectStop();
 
-        PowerMock.replayAll();
-
-        store.configure(DEFAULT_PROPS);
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 1L);
-        endOffsets.put(TP1, 1L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Trigger exception
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
-                    }
-                }, 10000);
-
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
-        store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testProducerError() throws Exception {
-        expectStart();
-        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY.array(), TP0_VALUE.array());
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp0Record), EasyMock.capture(callback1))).andReturn(tp0Future);
-        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY.array(), TP1_VALUE.array());
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
         Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp1Record), EasyMock.capture(callback2))).andReturn(tp1Future);
-
-        expectStop();
+        storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        store.configure(DEFAULT_PROPS);
 
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
+
+        store.configure(DEFAULT_PROPS);
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
 
+        // Set some offsets
         Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
         toSet.put(TP0_KEY, TP0_VALUE);
         toSet.put(TP1_KEY, TP1_VALUE);
-        final AtomicReference<Throwable> setException = new AtomicReference<>();
+        toSet.put(TP2_KEY, TP2_VALUE);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        final AtomicBoolean invokedFailure = new AtomicBoolean(false);
         Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
             @Override
             public void onCompletion(Throwable error, Void result) {
-                assertNull(setException.get()); // Should only be invoked once
-                setException.set(error);
+                invoked.set(true);
+                if (error != null)
+                    invokedFailure.set(true);
             }
         });
         assertFalse(setFuture.isDone());
-        KafkaException exc = new LeaderNotAvailableException("Error");
-        tp1Future.resolve(exc);
-        callback2.getValue().onCompletion(null, exc);
-        // One failure should resolve the future immediately
+        // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
+        // for the store's set callback
+        callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback2.getValue().onCompletion(null, new KafkaException("bogus error"));
+        assertTrue(invoked.get());
+        assertTrue(invokedFailure.get());
+        callback0.getValue().onCompletion(null, null);
         try {
             setFuture.get(10000, TimeUnit.MILLISECONDS);
-            fail("Should have see ExecutionException");
+            fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future");
         } catch (ExecutionException e) {
             // expected
+            assertNotNull(e.getCause());
+            assertTrue(e.getCause() instanceof KafkaException);
         }
-        assertNotNull(setException.get());
-
-        // Callbacks can continue to arrive
-        tp0Future.resolve((RecordMetadata) null);
-        callback1.getValue().onCompletion(null, null);
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
 
-    private void expectStart() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                .andReturn(consumer);
+    private void expectStart(final List<ConsumerRecord<byte[], byte[]>> preexistingRecords) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, rec);
+                return null;
+            }
+        });
     }
 
     private void expectStop() {
-        producer.close();
+        storeLog.stop();
         PowerMock.expectLastCall();
-        // MockConsumer close is checked after test.
     }
 
     private static ByteBuffer buffer(String v) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
new file mode 100644
index 0000000..96c4bcc
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaBasedLogTest {
+
+    private static final String TOPIC = "copycat-log";
+    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
+    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
+    private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>();
+    static {
+        PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+    }
+    private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>();
+    static {
+        CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+    }
+
+    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1));
+    private static final Map<String, String> FIRST_SET = new HashMap<>();
+    static {
+        FIRST_SET.put("key", "value");
+        FIRST_SET.put(null, null);
+    }
+
+    private static final Node LEADER = new Node(1, "broker1", 9092);
+    private static final Node REPLICA = new Node(1, "broker2", 9093);
+
+    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+
+    private static final String TP0_KEY = "TP0KEY";
+    private static final String TP1_KEY = "TP1KEY";
+    private static final String TP0_VALUE = "VAL0";
+    private static final String TP1_VALUE = "VAL1";
+    private static final String TP0_VALUE_NEW = "VAL0_NEW";
+    private static final String TP1_VALUE_NEW = "VAL1_NEW";
+
+    private Time time = new MockTime();
+    private KafkaBasedLog<String, String> store;
+
+    @Mock
+    private KafkaProducer<String, String> producer;
+    private MockConsumer<String, String> consumer;
+
+    private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();
+    private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
+            consumedRecords.add(record);
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
+                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time);
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
+        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(TP0, 0L);
+        beginningOffsets.put(TP1, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReloadOnStart() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Needs to seek to end to find end offsets
+                consumer.waitForPoll(10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+                    }
+                }, 10000);
+
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(2, consumedRecords.size());
+        assertEquals(TP0_VALUE, consumedRecords.get(0).value());
+        assertEquals(TP1_VALUE, consumedRecords.get(1).value());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendAndReadToEnd() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
+        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
+        ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future);
+
+        // Producer flushes when read to log end is called
+        producer.flush();
+        PowerMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it has partition info
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.seek(TP0, 0);
+                        consumer.seek(TP1, 0);
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        // Set some keys
+        final AtomicInteger invoked = new AtomicInteger(0);
+        org.apache.kafka.clients.producer.Callback producerCallback = new org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                invoked.incrementAndGet();
+            }
+        };
+        store.send(TP0_KEY, TP0_VALUE, producerCallback);
+        store.send(TP1_KEY, TP1_VALUE, producerCallback);
+        assertEquals(0, invoked.get());
+        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing
+        callback1.getValue().onCompletion(null, null);
+        assertEquals(1, invoked.get());
+        tp0Future.resolve((RecordMetadata) null);
+        callback0.getValue().onCompletion(null, null);
+        assertEquals(2, invoked.get());
+
+        // Now we should have to wait for the records to be read back when we call readToEnd()
+        final CountDownLatch startOffsetUpdateLatch = new CountDownLatch(1);
+        Thread readNewDataThread = new Thread("read-new-data-thread") {
+            @Override
+            public void run() {
+                // Needs to be woken up after calling readToEnd()
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            startOffsetUpdateLatch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException("Interrupted");
+                        }
+                    }
+                }, 10000);
+
+                // Needs to seek to end to find end offsets
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            startOffsetUpdateLatch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException("Interrupted");
+                        }
+
+                        Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
+                        newEndOffsets.put(TP0, 2L);
+                        newEndOffsets.put(TP1, 2L);
+                        consumer.updateEndOffsets(newEndOffsets);
+                    }
+                }, 10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+                    }
+                }, 10000);
+
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW));
+                    }
+                }, 10000);
+            }
+        };
+        readNewDataThread.start();
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                assertEquals(4, consumedRecords.size());
+                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
+                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+                getInvokedAndPassed.set(true);
+            }
+        });
+        store.readToEnd(readEndFutureCallback);
+        startOffsetUpdateLatch.countDown();
+        readNewDataThread.join(10000);
+        assertFalse(readNewDataThread.isAlive());
+        readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
+
+        // Cleanup
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConsumerError() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Trigger exception
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+                    }
+                }, 10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW));
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testProducerError() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it has partition info
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.seek(TP0, 0);
+                        consumer.seek(TP1, 0);
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        final AtomicReference<Throwable> setException = new AtomicReference<>();
+        store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                assertNull(setException.get()); // Should only be invoked once
+                setException.set(exception);
+            }
+        });
+        KafkaException exc = new LeaderNotAvailableException("Error");
+        tp0Future.resolve(exc);
+        callback0.getValue().onCompletion(null, exc);
+        assertNotNull(setException.get());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectStart() throws Exception {
+        PowerMock.expectPrivate(store, "createProducer")
+                .andReturn(producer);
+        PowerMock.expectPrivate(store, "createConsumer")
+                .andReturn(consumer);
+    }
+
+    private void expectStop() {
+        producer.close();
+        PowerMock.expectLastCall();
+        // MockConsumer close is checked after test.
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
index 8143c44..c5978ec 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.util;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -26,20 +27,35 @@ public class TestFuture<T> implements Future<T> {
     private volatile boolean resolved;
     private T result;
     private Throwable exception;
+    private CountDownLatch getCalledLatch;
+
+    private volatile boolean resolveOnGet;
+    private T resolveOnGetResult;
+    private Throwable resolveOnGetException;
 
     public TestFuture() {
         resolved = false;
+        getCalledLatch = new CountDownLatch(1);
+
+        resolveOnGet = false;
+        resolveOnGetResult = null;
+        resolveOnGetException = null;
     }
 
     public void resolve(T val) {
         this.result = val;
         resolved = true;
-
+        synchronized (this) {
+            this.notifyAll();
+        }
     }
 
     public void resolve(Throwable t) {
         exception = t;
         resolved = true;
+        synchronized (this) {
+            this.notifyAll();
+        }
     }
 
     @Override
@@ -59,6 +75,7 @@ public class TestFuture<T> implements Future<T> {
 
     @Override
     public T get() throws InterruptedException, ExecutionException {
+        getCalledLatch.countDown();
         while (true) {
             try {
                 return get(Integer.MAX_VALUE, TimeUnit.DAYS);
@@ -70,12 +87,69 @@ public class TestFuture<T> implements Future<T> {
 
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        while (!resolved) {
-            this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+        getCalledLatch.countDown();
+
+        if (resolveOnGet) {
+            if (resolveOnGetException != null)
+                resolve(resolveOnGetException);
+            else
+                resolve(resolveOnGetResult);
+        }
+
+        synchronized (this) {
+            while (!resolved) {
+                this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+            }
         }
 
         if (exception != null)
             throw new ExecutionException(exception);
         return result;
     }
+
+    /**
+     * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately.
+     * @param val the value to return from the future
+     */
+    public void resolveOnGet(T val) {
+        resolveOnGet = true;
+        resolveOnGetResult = val;
+    }
+
+    /**
+     * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately.
+     * @param t the exception to return from the future
+     */
+    public void resolveOnGet(Throwable t) {
+        resolveOnGet = true;
+        resolveOnGetException = t;
+    }
+
+    /**
+     * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with
+     * the specified value.
+     * @param val the value to return from the future
+     */
+    public void waitForGetAndResolve(T val) {
+        waitForGet();
+        resolve(val);
+    }
+
+    /**
+     * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with
+     * the specified value.
+     * @param t the exception to use to resolve the future
+     */
+    public void waitForGetAndResolve(Throwable t) {
+        waitForGet();
+        resolve(t);
+    }
+
+    private void waitForGet() {
+        try {
+            getCalledLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Unexpected interruption: ", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
index ea5753b..4e2ab40 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -104,9 +104,10 @@ class CopycatStandaloneService(CopycatServiceBase):
 class CopycatDistributedService(CopycatServiceBase):
     """Runs Copycat in distributed mode."""
 
-    def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offsets"):
+    def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offsets", configs_topic="copycat-configs"):
         super(CopycatDistributedService, self).__init__(context, num_nodes, kafka, files)
         self.offsets_topic = offsets_topic
+        self.configs_topic = configs_topic
 
     def set_configs(self, config_template, connector_config_template):
         """

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/tests/copycat_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py
index 3afcd57..9d00334 100644
--- a/tests/kafkatest/tests/copycat_distributed_test.py
+++ b/tests/kafkatest/tests/copycat_distributed_test.py
@@ -29,6 +29,7 @@ class CopycatDistributedFileTest(KafkaTest):
 
     TOPIC = "test"
     OFFSETS_TOPIC = "copycat-offsets"
+    CONFIG_TOPIC = "copycat-configs"
 
     FIRST_INPUT_LISTS = [["foo", "bar", "baz"], ["foo2", "bar2", "baz2"]]
     FIRST_INPUTS = ["\n".join(input_list) + "\n" for input_list in FIRST_INPUT_LISTS]
@@ -42,8 +43,13 @@ class CopycatDistributedFileTest(KafkaTest):
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         })
 
-        self.source = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE])
-        self.sink = CopycatDistributedService(test_context, 2, self.kafka, [self.OUTPUT_FILE])
+        # FIXME these should have multiple nodes. However, currently the connectors are submitted via command line,
+        # which means we would get duplicates. Both would run, but they would have conflicting keys for offsets and
+        # configs. Until we have real distributed coordination of workers with unified connector submission, we need
+        # to restrict each of these to a single node.
+        self.num_nodes = 1
+        self.source = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.INPUT_FILE])
+        self.sink = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.OUTPUT_FILE])
 
     def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True):
         assert converter != None, "converter type must be set"
@@ -62,7 +68,7 @@ class CopycatDistributedFileTest(KafkaTest):
         # Generating data on the source node should generate new records and create new output on the sink node
         for node, input in zip(self.source.nodes, self.FIRST_INPUTS):
             node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
 
         # Restarting both should result in them picking up where they left off,
         # only processing new data.
@@ -71,7 +77,7 @@ class CopycatDistributedFileTest(KafkaTest):
 
         for node, input in zip(self.source.nodes, self.SECOND_INPUTS):
             node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS + self.SECOND_INPUT_LISTS), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes] + self.SECOND_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
 
     def validate_output(self, inputs):
         try:

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/tests/templates/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties b/tests/kafkatest/tests/templates/copycat-distributed.properties
index da47423..31f9901 100644
--- a/tests/kafkatest/tests/templates/copycat-distributed.properties
+++ b/tests/kafkatest/tests/templates/copycat-distributed.properties
@@ -24,9 +24,10 @@ key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 {% endif %}
 
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
+config.storage.topic={{ CONFIG_TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/tests/templates/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties
index 39db6ce..c89490a 100644
--- a/tests/kafkatest/tests/templates/copycat-standalone.properties
+++ b/tests/kafkatest/tests/templates/copycat-standalone.properties
@@ -24,9 +24,9 @@ key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 {% endif %}
 
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
 
 offset.storage.file.filename={{ OFFSETS_FILE }}


Mime
View raw message