kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/2] kafka git commit: KAFKA-2373: Add Kafka-backed offset storage for Copycat.
Date Fri, 25 Sep 2015 02:01:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bcf374da9 -> 48b4d6938


http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
new file mode 100644
index 0000000..6a3eec3
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.TestFuture;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.kafka.copycat.util.ByteArrayProducerRecordEquals.eqProducerRecord;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaOffsetBackingStore.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaOffsetBackingStoreTest {
+    private static final String TOPIC = "copycat-offsets";
+    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
+    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
+    private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
+    static {
+        DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
+        DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+    }
+    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0,
TP1));
+    private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
+    static {
+        FIRST_SET.put(buffer("key"), buffer("value"));
+        FIRST_SET.put(null, null);
+    }
+
+
+    private static final Node LEADER = new Node(1, "broker1", 9092);
+    private static final Node REPLICA = new Node(1, "broker2", 9093);
+
+    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new
Node[]{REPLICA}, new Node[]{REPLICA});
+    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new
Node[]{REPLICA}, new Node[]{REPLICA});
+
+    private static final ByteBuffer TP0_KEY = buffer("TP0KEY");
+    private static final ByteBuffer TP1_KEY = buffer("TP1KEY");
+    private static final ByteBuffer TP0_VALUE = buffer("VAL0");
+    private static final ByteBuffer TP1_VALUE = buffer("VAL1");
+    private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
+    private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
+
+    private KafkaOffsetBackingStore store;
+
+    @Mock private KafkaProducer<byte[], byte[]> producer;
+    private MockConsumer<byte[], byte[]> consumer;
+
+    @Before
+    public void setUp() throws Exception {
+        store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class,
new String[]{"createConsumer", "createProducer"});
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
+        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(TP0, 0L);
+        beginningOffsets.put(TP1, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testMissingTopic() {
+        store = new KafkaOffsetBackingStore();
+        store.configure(Collections.<String, Object>emptyMap());
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReloadOnStart() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Needs to seek to end to find end offsets
+                consumer.waitForPoll(10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(),
TP0_VALUE.array()));
+                    }
+                }, 10000);
+
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(),
TP1_VALUE.array()));
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+        assertEquals(TP0_VALUE, data.get(TP0_KEY));
+        assertEquals(TP1_VALUE, data.get(TP1_KEY));
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetSet() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<byte[], byte[]> tp0Record = new ProducerRecord<>(TOPIC,
TP0_KEY.array(), TP0_VALUE.array());
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(eqProducerRecord(tp0Record), EasyMock.capture(callback1))).andReturn(tp0Future);
+        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
+        ProducerRecord<byte[], byte[]> tp1Record = new ProducerRecord<>(TOPIC,
TP1_KEY.array(), TP1_VALUE.array());
+        Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(eqProducerRecord(tp1Record), EasyMock.capture(callback2))).andReturn(tp1Future);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it has partition info
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.seek(TP0, 0);
+                        consumer.seek(TP1, 0);
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, TP0_VALUE);
+        toSet.put(TP1_KEY, TP1_VALUE);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                invoked.set(true);
+            }
+        });
+        assertFalse(setFuture.isDone());
+        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return
a real value for testing
+        assertFalse(setFuture.isDone());
+        tp0Future.resolve((RecordMetadata) null);
+        // Out of order callbacks shouldn't matter, should still require all to be invoked
before invoking the callback
+        // for the store's set callback
+        callback2.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback1.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should continue to return old data...
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>()
{
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result)
{
+                // Since we didn't read them yet, these will be null
+                assertEquals(null, result.get(TP0_KEY));
+                assertEquals(null, result.get(TP1_KEY));
+                getInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
+
+        // Until the consumer gets the new data
+        Thread readNewDataThread = new Thread("read-new-data-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(),
TP0_VALUE_NEW.array()));
+                    }
+                }, 10000);
+
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(),
TP1_VALUE_NEW.array()));
+                    }
+                }, 10000);
+            }
+        };
+        readNewDataThread.start();
+        readNewDataThread.join(10000);
+        assertFalse(readNewDataThread.isAlive());
+
+        // And now the new data should be returned
+        final AtomicBoolean finalGetInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>()
{
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result)
{
+                assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
+                assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
+                finalGetInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(finalGetInvokedAndPassed.get());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConsumerError() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Trigger exception
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+                    }
+                }, 10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(),
TP0_VALUE_NEW.array()));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY.array(),
TP0_VALUE_NEW.array()));
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testProducerError() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<byte[], byte[]> tp0Record = new ProducerRecord<>(TOPIC,
TP0_KEY.array(), TP0_VALUE.array());
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(eqProducerRecord(tp0Record), EasyMock.capture(callback1))).andReturn(tp0Future);
+        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
+        ProducerRecord<byte[], byte[]> tp1Record = new ProducerRecord<>(TOPIC,
TP1_KEY.array(), TP1_VALUE.array());
+        Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(eqProducerRecord(tp1Record), EasyMock.capture(callback2))).andReturn(tp1Future);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it has partition info
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.seek(TP0, 0);
+                        consumer.seek(TP1, 0);
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, TP0_VALUE);
+        toSet.put(TP1_KEY, TP1_VALUE);
+        final AtomicReference<Throwable> setException = new AtomicReference<>();
+        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                assertNull(setException.get()); // Should only be invoked once
+                setException.set(error);
+            }
+        });
+        assertFalse(setFuture.isDone());
+        KafkaException exc = new LeaderNotAvailableException("Error");
+        tp1Future.resolve(exc);
+        callback2.getValue().onCompletion(null, exc);
+        // One failure should resolve the future immediately
+        try {
+            setFuture.get(10000, TimeUnit.MILLISECONDS);
+            fail("Should have see ExecutionException");
+        } catch (ExecutionException e) {
+            // expected
+        }
+        assertNotNull(setException.get());
+
+        // Callbacks can continue to arrive
+        tp0Future.resolve((RecordMetadata) null);
+        callback1.getValue().onCompletion(null, null);
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectStart() throws Exception {
+        PowerMock.expectPrivate(store, "createProducer")
+                .andReturn(producer);
+        PowerMock.expectPrivate(store, "createConsumer")
+                .andReturn(consumer);
+    }
+
+    private void expectStop() {
+        producer.close();
+        PowerMock.expectLastCall();
+        // MockConsumer close is checked after test.
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
index 956d064..e33ecd0 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -31,7 +31,9 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 
@@ -43,6 +45,7 @@ public class OffsetStorageWriterTest {
     private static final String NAMESPACE = "namespace";
     // Copycat format - any types should be accepted here
     private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key",
"key");
+    private static final List<Object> OFFSET_KEY_WRAPPED = Arrays.asList(NAMESPACE,
OFFSET_KEY);
     private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key",
12);
 
     // Serialized
@@ -195,12 +198,11 @@ public class OffsetStorageWriterTest {
     private void expectStore(final Callback<Void> callback,
                              final boolean fail,
                              final CountDownLatch waitForCompletion) {
-        EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED);
+        EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY_WRAPPED)).andReturn(OFFSET_KEY_SERIALIZED);
         EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
 
         final Capture<Callback<Void>> storeCallback = Capture.newInstance();
-        EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),
-                EasyMock.capture(storeCallback)))
+        EasyMock.expect(store.set(EasyMock.eq(OFFSETS_SERIALIZED), EasyMock.capture(storeCallback)))
                 .andAnswer(new IAnswer<Future<Void>>() {
                     @Override
                     public Future<Void> answer() throws Throwable {

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
new file mode 100644
index 0000000..929ea85
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+
+import java.util.Arrays;
+
+public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
+    private ProducerRecord<byte[], byte[]> record;
+
+    public static ProducerRecord<byte[], byte[]> eqProducerRecord(ProducerRecord<byte[],
byte[]> in) {
+        EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in));
+        return null;
+    }
+
+    public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) {
+        this.record = record;
+    }
+
+    @Override
+    public boolean matches(Object argument) {
+        if (!(argument instanceof ProducerRecord))
+            return false;
+        ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], byte[]>)
argument;
+        return record.topic().equals(other.topic()) &&
+                record.partition() != null ? record.partition().equals(other.partition())
: other.partition() == null &&
+                record.key() != null ? Arrays.equals(record.key(), other.key()) : other.key()
== null &&
+                record.value() != null ? Arrays.equals(record.value(), other.value()) : other.value()
== null;
+    }
+
+    @Override
+    public void appendTo(StringBuffer buffer) {
+        buffer.append(record.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
new file mode 100644
index 0000000..8143c44
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class TestFuture<T> implements Future<T> {
+    private volatile boolean resolved;
+    private T result;
+    private Throwable exception;
+
+    public TestFuture() {
+        resolved = false;
+    }
+
+    public void resolve(T val) {
+        this.result = val;
+        resolved = true;
+
+    }
+
+    public void resolve(Throwable t) {
+        exception = t;
+        resolved = true;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return resolved;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        while (true) {
+            try {
+                return get(Integer.MAX_VALUE, TimeUnit.DAYS);
+            } catch (TimeoutException e) {
+                // ignore
+            }
+        }
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
+        while (!resolved) {
+            this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+        }
+
+        if (exception != null)
+            throw new ExecutionException(exception);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/copycat/runtime/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/resources/log4j.properties b/copycat/runtime/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d5e90fe
--- /dev/null
+++ b/copycat/runtime/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=ERROR

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
index 7452b85..ea5753b 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -18,8 +18,8 @@ from ducktape.utils.util import wait_until
 import subprocess, signal
 
 
-class CopycatStandaloneService(Service):
-    """Runs Copycat in standalone mode."""
+class CopycatServiceBase(Service):
+    """Base class for Copycat services providing some common settings and functionality"""
 
     logs = {
         "kafka_log": {
@@ -27,16 +27,55 @@ class CopycatStandaloneService(Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, kafka, files):
-        super(CopycatStandaloneService, self).__init__(context, 1)
+    def __init__(self, context, num_nodes, kafka, files):
+        super(CopycatServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.files = files
 
+    def pids(self, node):
+        """Return process ids for Copycat processes."""
+        try:
+            return [pid for pid in node.account.ssh_capture("cat /mnt/copycat.pid", callback=int)]
+        except:
+            return []
+
+    def stop_node(self, node, clean_shutdown=True):
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=False)
+        for pid in pids:
+            wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Copycat
standalone process took too long to exit")
+
+        node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False)
+
+    def restart(self):
+        # We don't want to do any clean up here, just restart the process
+        for node in self.nodes:
+            self.stop_node(node)
+            self.start_node(node)
+
+    def clean_node(self, node):
+        if len(self.pids(node)) > 0:
+            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..."
%
+                             (self.__class__.__name__, node.account))
+        for pid in self.pids(node):
+            node.account.signal(pid, signal.SIGKILL, allow_fail=False)
+        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties
/mnt/copycat-connector.properties " + " ".join(self.files), allow_fail=False)
+
+
+class CopycatStandaloneService(CopycatServiceBase):
+    """Runs Copycat in standalone mode."""
+
+    def __init__(self, context, kafka, files):
+        super(CopycatStandaloneService, self).__init__(context, 1, kafka, files)
+
     def set_configs(self, config_template, connector_config_template):
         """
-        Set configurations for the standalone worker and the connector to run on
-        it. These are not provided in the constructor because at least the
-        standalone worker config generally needs access to ZK/Kafka services to
+        Set configurations for the worker and the connector to run on
+        it. These are not provided in the constructor because the worker
+        config generally needs access to ZK/Kafka services to
         create the configuration.
         """
         self.config_template = config_template
@@ -47,9 +86,6 @@ class CopycatStandaloneService(Service):
     def node(self):
         return self.nodes[0]
 
-    def start(self):
-        super(CopycatStandaloneService, self).start()
-
     def start_node(self, node):
         node.account.create_file("/mnt/copycat.properties", self.config_template)
         node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template)
@@ -60,36 +96,38 @@ class CopycatStandaloneService(Service):
                              "1>> /mnt/copycat.log 2>> /mnt/copycat.log &
echo $! > /mnt/copycat.pid")
             monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message
indicating Copycat finished startup")
 
-        if len(self.pids()) == 0:
+        if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
 
-    def pids(self):
-        """Return process ids for Copycat processes."""
-        try:
-            return [pid for pid in self.node.account.ssh_capture("cat /mnt/copycat.pid",
callback=int)]
-        except:
-            return []
 
-    def stop_node(self, node, clean_shutdown=True):
-        pids = self.pids()
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
 
-        for pid in pids:
-            self.node.account.signal(pid, sig, allow_fail=False)
-        for pid in pids:
-            wait_until(lambda: not self.node.account.alive(pid), timeout_sec=10, err_msg="Copycat
standalone process took too long to exit")
+class CopycatDistributedService(CopycatServiceBase):
+    """Runs Copycat in distributed mode."""
 
-        node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False)
+    def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offsets"):
+        super(CopycatDistributedService, self).__init__(context, num_nodes, kafka, files)
+        self.offsets_topic = offsets_topic
 
-    def restart(self):
-        # We don't want to do any clean up here, just restart the process
-        self.stop_node(self.node)
-        self.start_node(self.node)
+    def set_configs(self, config_template, connector_config_template):
+        """
+        Set configurations for the worker and the connector to run on
+        it. These are not provided in the constructor because the worker
+        config generally needs access to ZK/Kafka services to
+        create the configuration.
+        """
+        self.config_template = config_template
+        self.connector_config_template = connector_config_template
+
+    def start_node(self, node):
+        node.account.create_file("/mnt/copycat.properties", self.config_template)
+        node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template)
+
+        self.logger.info("Starting Copycat standalone process")
+        with node.account.monitor_log("/mnt/copycat.log") as monitor:
+            node.account.ssh("/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties
/mnt/copycat-connector.properties " +
+                             "1>> /mnt/copycat.log 2>> /mnt/copycat.log &
echo $! > /mnt/copycat.pid")
+            monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message
indicating Copycat finished startup")
+
+        if len(self.pids(node)) == 0:
+            raise RuntimeError("No process ids recorded")
 
-    def clean_node(self, node):
-        if len(self.pids()) > 0:
-            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..."
%
-                             (self.__class__.__name__, node.account))
-        for pid in self.pids():
-            self.node.account.signal(pid, signal.SIGKILL, allow_fail=False)
-        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties
/mnt/copycat-connector.properties " + " ".join(self.files), allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/tests/kafkatest/tests/copycat_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py
new file mode 100644
index 0000000..3afcd57
--- /dev/null
+++ b/tests/kafkatest/tests/copycat_distributed_test.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.copycat import CopycatDistributedService
+from ducktape.utils.util import wait_until
+import hashlib, subprocess, json, itertools
+
+class CopycatDistributedFileTest(KafkaTest):
+    """
+    Simple test of Copycat in distributed mode, producing data from files on on Copycat cluster
and consuming it on
+    another, validating the total output is identical to the input.
+    """
+
+    INPUT_FILE = "/mnt/copycat.input"
+    OUTPUT_FILE = "/mnt/copycat.output"
+
+    TOPIC = "test"
+    OFFSETS_TOPIC = "copycat-offsets"
+
+    FIRST_INPUT_LISTS = [["foo", "bar", "baz"], ["foo2", "bar2", "baz2"]]
+    FIRST_INPUTS = ["\n".join(input_list) + "\n" for input_list in FIRST_INPUT_LISTS]
+    SECOND_INPUT_LISTS = [["razz", "ma", "tazz"], ["razz2", "ma2", "tazz2"]]
+    SECOND_INPUTS = ["\n".join(input_list) + "\n" for input_list in SECOND_INPUT_LISTS]
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(CopycatDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1,
topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.source = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE])
+        self.sink = CopycatDistributedService(test_context, 2, self.kafka, [self.OUTPUT_FILE])
+
+    def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter",
schemas=True):
+        assert converter != None, "converter type must be set"
+        # Template parameters
+        self.key_converter = converter
+        self.value_converter = converter
+        self.schemas = schemas
+
+        # These need to be set
+        self.source.set_configs(self.render("copycat-distributed.properties"), self.render("copycat-file-source.properties"))
+        self.sink.set_configs(self.render("copycat-distributed.properties"), self.render("copycat-file-sink.properties"))
+
+        self.source.start()
+        self.sink.start()
+
+        # Generating data on the source node should generate new records and create new output
on the sink node
+        for node, input in zip(self.source.nodes, self.FIRST_INPUTS):
+            node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS), timeout_sec=60,
err_msg="Data added to input file was not seen in the output file in a reasonable amount of
time.")
+
+        # Restarting both should result in them picking up where they left off,
+        # only processing new data.
+        self.source.restart()
+        self.sink.restart()
+
+        for node, input in zip(self.source.nodes, self.SECOND_INPUTS):
+            node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS + self.SECOND_INPUT_LISTS),
timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+
+    def validate_output(self, inputs):
+        try:
+            input_set = set(itertools.chain(*inputs))
+            output_set = set(itertools.chain(*[
+                [line.strip() for line in node.account.ssh_capture("cat " + self.OUTPUT_FILE)]
for node in self.sink.nodes
+            ]))
+            return input_set == output_set
+        except subprocess.CalledProcessError:
+            return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py
index b4adf53..1bd8ccb 100644
--- a/tests/kafkatest/tests/copycat_test.py
+++ b/tests/kafkatest/tests/copycat_test.py
@@ -53,7 +53,7 @@ class CopycatStandaloneFileTest(KafkaTest):
     @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True)
     @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=False)
     @parametrize(converter="org.apache.kafka.copycat.storage.StringConverter", schemas=None)
-    def test_file_source_and_sink(self, converter="org.apache.kafka.json.JsonConverter",
schemas=True):
+    def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter",
schemas=True):
         assert converter != None, "converter type must be set"
         # Template parameters
         self.key_converter = converter

http://git-wip-us.apache.org/repos/asf/kafka/blob/48b4d693/tests/kafkatest/tests/templates/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties b/tests/kafkatest/tests/templates/copycat-distributed.properties
new file mode 100644
index 0000000..da47423
--- /dev/null
+++ b/tests/kafkatest/tests/templates/copycat-distributed.properties
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers={{ kafka.bootstrap_servers() }}
+
+key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter")
}}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.key.converter.schemas.enable=false
+offset.value.converter.schemas.enable=false
+
+offset.storage.topic={{ OFFSETS_TOPIC }}


Mime
View raw message