kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [4/7] kafka git commit: KAFKA-2366; Initial patch for Copycat
Date Fri, 14 Aug 2015 23:01:08 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
new file mode 100644
index 0000000..e41364e
--- /dev/null
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.sink.SinkConnector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Very simple connector that works with the console. This connector supports both source and
+ * sink modes via its 'mode' setting.
+ */
+public class FileStreamSinkConnector extends SinkConnector {
+    public static final String FILE_CONFIG = "file";
+
+    private String filename;
+
+    @Override
+    public void start(Properties props) {
+        filename = props.getProperty(FILE_CONFIG);
+    }
+
+    @Override
+    public Class<? extends Task> getTaskClass() {
+        return FileStreamSinkTask.class;
+    }
+
+    @Override
+    public List<Properties> getTaskConfigs(int maxTasks) {
+        ArrayList<Properties> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Properties config = new Properties();
+            if (filename != null)
+                config.setProperty(FILE_CONFIG, filename);
+            configs.add(config);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do since FileStreamSinkConnector has no background monitoring.
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
new file mode 100644
index 0000000..7e4ca7e
--- /dev/null
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * FileStreamSinkTask writes records to stdout or a file.
+ */
+public class FileStreamSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
+
+    private PrintStream outputStream;
+
+    public FileStreamSinkTask() {
+    }
+
+    // for testing
+    public FileStreamSinkTask(PrintStream outputStream) {
+        this.outputStream = outputStream;
+    }
+
+    @Override
+    public void start(Properties props) {
+        String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
+        if (filename == null) {
+            outputStream = System.out;
+        } else {
+            try {
+                outputStream = new PrintStream(new File(filename));
+            } catch (FileNotFoundException e) {
+                throw new CopycatException("Couldn't find or create file for FileStreamSinkTask: {}", e);
+            }
+        }
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> sinkRecords) {
+        for (SinkRecord record : sinkRecords) {
+            outputStream.println(record.getValue());
+        }
+    }
+
+    @Override
+    public void flush(Map<TopicPartition, Long> offsets) {
+        outputStream.flush();
+    }
+
+    @Override
+    public void stop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
new file mode 100644
index 0000000..4f9d8d0
--- /dev/null
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Very simple connector that works with the console. This connector supports both source and
+ * sink modes via its 'mode' setting.
+ */
+public class FileStreamSourceConnector extends SourceConnector {
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String FILE_CONFIG = "file";
+
+    private String filename;
+    private String topic;
+
+    @Override
+    public void start(Properties props) {
+        filename = props.getProperty(FILE_CONFIG);
+        topic = props.getProperty(TOPIC_CONFIG);
+        if (topic == null || topic.isEmpty())
+            throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting");
+        if (topic.contains(","))
+            throw new CopycatException("FileStreamSourceConnector should only have a single topic when used as a source.");
+    }
+
+    @Override
+    public Class<? extends Task> getTaskClass() {
+        return FileStreamSourceTask.class;
+    }
+
+    @Override
+    public List<Properties> getTaskConfigs(int maxTasks) {
+        ArrayList<Properties> configs = new ArrayList<>();
+        // Only one input stream makes sense.
+        Properties config = new Properties();
+        if (filename != null)
+            config.setProperty(FILE_CONFIG, filename);
+        config.setProperty(TOPIC_CONFIG, topic);
+        configs.add(config);
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do since FileStreamSourceConnector has no background monitoring.
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
new file mode 100644
index 0000000..572ae1f
--- /dev/null
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * FileStreamSourceTask reads from stdin or a file.
+ */
+public class FileStreamSourceTask extends SourceTask {
+    private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
+
+    private InputStream stream;
+    private BufferedReader reader = null;
+    private char[] buffer = new char[1024];
+    private int offset = 0;
+    private String topic = null;
+
+    private Long streamOffset;
+
+    @Override
+    public void start(Properties props) {
+        String filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
+        if (filename == null) {
+            stream = System.in;
+            // Tracking offset for stdin doesn't make sense
+            streamOffset = null;
+        } else {
+            try {
+                stream = new FileInputStream(filename);
+                Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null);
+                if (lastRecordedOffset != null) {
+                    log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
+                    long skipLeft = lastRecordedOffset;
+                    while (skipLeft > 0) {
+                        try {
+                            long skipped = stream.skip(skipLeft);
+                            skipLeft -= skipped;
+                        } catch (IOException e) {
+                            log.error("Error while trying to seek to previous offset in file: ", e);
+                            throw new CopycatException(e);
+                        }
+                    }
+                    log.debug("Skipped to offset {}", lastRecordedOffset);
+                }
+                streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
+            } catch (FileNotFoundException e) {
+                throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e);
+            }
+        }
+        topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
+        if (topic == null)
+            throw new CopycatException("ConsoleSourceTask config missing topic setting");
+        reader = new BufferedReader(new InputStreamReader(stream));
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
+        // Instead we have to manage splitting lines ourselves, using simple backoff when no new data
+        // is available.
+        try {
+            final BufferedReader readerCopy;
+            synchronized (this) {
+                readerCopy = reader;
+            }
+            if (readerCopy == null)
+                return null;
+
+            ArrayList<SourceRecord> records = null;
+
+            int nread = 0;
+            while (readerCopy.ready()) {
+                nread = readerCopy.read(buffer, offset, buffer.length - offset);
+
+                if (nread > 0) {
+                    offset += nread;
+                    if (offset == buffer.length) {
+                        char[] newbuf = new char[buffer.length * 2];
+                        System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+                        buffer = newbuf;
+                    }
+
+                    String line;
+                    do {
+                        line = extractLine();
+                        if (line != null) {
+                            if (records == null)
+                                records = new ArrayList<>();
+                            records.add(new SourceRecord(null, streamOffset, topic, line));
+                        }
+                        new ArrayList<SourceRecord>();
+                    } while (line != null);
+                }
+            }
+
+            if (nread <= 0)
+                Thread.sleep(1);
+
+            return records;
+        } catch (IOException e) {
+            // Underlying stream was killed, probably as a result of calling stop. Allow to return
+            // null, and driving thread will handle any shutdown if necessary.
+        }
+        return null;
+    }
+
+    private String extractLine() {
+        int until = -1, newStart = -1;
+        for (int i = 0; i < offset; i++) {
+            if (buffer[i] == '\n') {
+                until = i;
+                newStart = i + 1;
+                break;
+            } else if (buffer[i] == '\r') {
+                // We need to check for \r\n, so we must skip this if we can't check the next char
+                if (i + 1 >= offset)
+                    return null;
+
+                until = i;
+                newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
+                break;
+            }
+        }
+
+        if (until != -1) {
+            String result = new String(buffer, 0, until);
+            System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
+            offset = offset - newStart;
+            if (streamOffset != null)
+                streamOffset += newStart;
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void stop() {
+        log.trace("Stopping");
+        synchronized (this) {
+            try {
+                stream.close();
+                log.trace("Closed input stream");
+            } catch (IOException e) {
+                log.error("Failed to close ConsoleSourceTask stream: ", e);
+            }
+            reader = null;
+            stream = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
new file mode 100644
index 0000000..643fb43
--- /dev/null
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSinkConnectorTest {
+
+    private static final String MULTIPLE_TOPICS = "test1,test2";
+    private static final String[] MULTIPLE_TOPICS_LIST
+            = MULTIPLE_TOPICS.split(",");
+    private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList(
+            new TopicPartition("test1", 1), new TopicPartition("test2", 2)
+    );
+    private static final String FILENAME = "/afilename";
+
+    private FileStreamSinkConnector connector;
+    private ConnectorContext ctx;
+    private Properties sinkProperties;
+
+    @Before
+    public void setup() {
+        connector = new FileStreamSinkConnector();
+        ctx = PowerMock.createMock(ConnectorContext.class);
+        connector.initialize(ctx);
+
+        sinkProperties = new Properties();
+        sinkProperties.setProperty(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
+        sinkProperties.setProperty(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
+    }
+
+    @Test
+    public void testSinkTasks() {
+        PowerMock.replayAll();
+
+        connector.start(sinkProperties);
+        List<Properties> taskConfigs = connector.getTaskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
+
+        taskConfigs = connector.getTaskConfigs(2);
+        assertEquals(2, taskConfigs.size());
+        for (int i = 0; i < 2; i++) {
+            assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskClass() {
+        PowerMock.replayAll();
+
+        connector.start(sinkProperties);
+        assertEquals(FileStreamSinkTask.class, connector.getTaskClass());
+
+        PowerMock.verifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
new file mode 100644
index 0000000..b4e1b0c
--- /dev/null
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSinkTaskTest {
+
+    private FileStreamSinkTask task;
+    private ByteArrayOutputStream os;
+    private PrintStream printStream;
+
+    @Before
+    public void setup() {
+        os = new ByteArrayOutputStream();
+        printStream = new PrintStream(os);
+        task = new FileStreamSinkTask(printStream);
+    }
+
+    @Test
+    public void testPutFlush() {
+        HashMap<TopicPartition, Long> offsets = new HashMap<>();
+
+        // We do not call task.start() since it would override the output stream
+
+        task.put(Arrays.asList(
+                new SinkRecord("topic1", 0, null, "line1", 1)
+        ));
+        offsets.put(new TopicPartition("topic1", 0), 1L);
+        task.flush(offsets);
+        assertEquals("line1\n", os.toString());
+
+        task.put(Arrays.asList(
+                new SinkRecord("topic1", 0, null, "line2", 2),
+                new SinkRecord("topic2", 0, null, "line3", 1)
+        ));
+        offsets.put(new TopicPartition("topic1", 0), 2L);
+        offsets.put(new TopicPartition("topic2", 0), 1L);
+        task.flush(offsets);
+        assertEquals("line1\nline2\nline3\n", os.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
new file mode 100644
index 0000000..e23055c
--- /dev/null
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class FileStreamSourceConnectorTest {
+
+    private static final String SINGLE_TOPIC = "test";
+    private static final String MULTIPLE_TOPICS = "test1,test2";
+    private static final String FILENAME = "/somefilename";
+
+    private FileStreamSourceConnector connector;
+    private ConnectorContext ctx;
+    private Properties sourceProperties;
+
+    @Before
+    public void setup() {
+        connector = new FileStreamSourceConnector();
+        ctx = PowerMock.createMock(ConnectorContext.class);
+        connector.initialize(ctx);
+
+        sourceProperties = new Properties();
+        sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
+        sourceProperties.setProperty(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
+    }
+
+    @Test
+    public void testSourceTasks() {
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        List<Properties> taskConfigs = connector.getTaskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertEquals(FILENAME,
+                taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+        assertEquals(SINGLE_TOPIC,
+                taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
+
+        // Should be able to return fewer than requested #
+        taskConfigs = connector.getTaskConfigs(2);
+        assertEquals(1, taskConfigs.size());
+        assertEquals(FILENAME,
+                taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+        assertEquals(SINGLE_TOPIC,
+                taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSourceTasksStdin() {
+        PowerMock.replayAll();
+
+        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        connector.start(sourceProperties);
+        List<Properties> taskConfigs = connector.getTaskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertNull(taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testMultipleSourcesInvalid() {
+        sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
+        connector.start(sourceProperties);
+    }
+
+    @Test
+    public void testTaskClass() {
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        assertEquals(FileStreamSourceTask.class, connector.getTaskClass());
+
+        PowerMock.verifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
new file mode 100644
index 0000000..0ec71d3
--- /dev/null
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.file;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSourceTaskTest {
+
+    private static final String TOPIC = "test";
+
+    private File tempFile;
+    private Properties config;
+    private OffsetStorageReader offsetStorageReader;
+    private FileStreamSourceTask task;
+
+    private boolean verifyMocks = false;
+
+    @Before
+    public void setup() throws IOException {
+        tempFile = File.createTempFile("file-stream-source-task-test", null);
+        config = new Properties();
+        config.setProperty(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
+        config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+        task = new FileStreamSourceTask();
+        offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
+        task.initialize(new SourceTaskContext(offsetStorageReader));
+    }
+
+    @After
+    public void teardown() {
+        tempFile.delete();
+
+        if (verifyMocks)
+            PowerMock.verifyAll();
+    }
+
+    private void replay() {
+        PowerMock.replayAll();
+        verifyMocks = true;
+    }
+
+    @Test
+    public void testNormalLifecycle() throws InterruptedException, IOException {
+        expectOffsetLookupReturnNone();
+        replay();
+
+        task.start(config);
+
+        FileOutputStream os = new FileOutputStream(tempFile);
+        assertEquals(null, task.poll());
+        os.write("partial line".getBytes());
+        os.flush();
+        assertEquals(null, task.poll());
+        os.write(" finished\n".getBytes());
+        os.flush();
+        List<SourceRecord> records = task.poll();
+        assertEquals(1, records.size());
+        assertEquals(TOPIC, records.get(0).getTopic());
+        assertEquals("partial line finished", records.get(0).getValue());
+        assertEquals(22L, records.get(0).getSourceOffset());
+        assertEquals(null, task.poll());
+
+        // Different line endings, and make sure the final \r doesn't result in a line until we can
+        // read the subsequent byte.
+        os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
+        os.flush();
+        records = task.poll();
+        assertEquals(4, records.size());
+        assertEquals("line1", records.get(0).getValue());
+        assertEquals(28L, records.get(0).getSourceOffset());
+        assertEquals("line2", records.get(1).getValue());
+        assertEquals(35L, records.get(1).getSourceOffset());
+        assertEquals("line3", records.get(2).getValue());
+        assertEquals(41L, records.get(2).getSourceOffset());
+        assertEquals("line4", records.get(3).getValue());
+        assertEquals(47L, records.get(3).getSourceOffset());
+
+        os.write("subsequent text".getBytes());
+        os.flush();
+        records = task.poll();
+        assertEquals(1, records.size());
+        assertEquals("", records.get(0).getValue());
+        assertEquals(48L, records.get(0).getSourceOffset());
+
+        task.stop();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testMissingTopic() {
+        expectOffsetLookupReturnNone();
+        replay();
+
+        config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
+        task.start(config);
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testInvalidFile() {
+        config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
+        task.start(config);
+    }
+
+
+    private void expectOffsetLookupReturnNone() {
+        EasyMock.expect(
+                offsetStorageReader.getOffset(EasyMock.anyObject(Object.class)))
+                .andReturn(null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
new file mode 100644
index 0000000..36a6ca8
--- /dev/null
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.copycat.data.*;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.storage.Converter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Implementation of Converter that uses JSON to store schemas and objects.
+ */
+public class JsonConverter implements Converter<JsonNode> {
+
+    private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
+            = new HashMap<>();
+
+    static {
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.BOOLEAN_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                return value.booleanValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.INT_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                return value.intValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.LONG_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                return value.longValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.FLOAT_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                return value.floatValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.DOUBLE_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                return value.doubleValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.BYTES_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                try {
+                    return value.binaryValue();
+                } catch (IOException e) {
+                    throw new CopycatException("Invalid bytes field", e);
+                }
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.STRING_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                return value.textValue();
+            }
+        });
+        TO_COPYCAT_CONVERTERS.put(JsonSchema.ARRAY_TYPE_NAME, new JsonToCopycatTypeConverter() {
+            @Override
+            public Object convert(JsonNode jsonSchema, JsonNode value) {
+                JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
+                if (elemSchema == null)
+                    throw new CopycatException("Array schema did not specify the element type");
+                ArrayList<Object> result = new ArrayList<>();
+                for (JsonNode elem : value) {
+                    result.add(convertToCopycat(elemSchema, elem));
+                }
+                return result;
+            }
+        });
+
+    }
+
+    @Override
+    public JsonNode fromCopycatData(Object value) {
+        return convertToJsonWithSchemaEnvelope(value);
+    }
+
+    @Override
+    public Object toCopycatData(JsonNode value) {
+        if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+            throw new CopycatException("JSON value converted to Copycat must be in envelope containing schema");
+
+        return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+
+    private static JsonNode asJsonSchema(Schema schema) {
+        switch (schema.getType()) {
+            case BOOLEAN:
+                return JsonSchema.BOOLEAN_SCHEMA;
+            case BYTES:
+                return JsonSchema.BYTES_SCHEMA;
+            case DOUBLE:
+                return JsonSchema.DOUBLE_SCHEMA;
+            case FLOAT:
+                return JsonSchema.FLOAT_SCHEMA;
+            case INT:
+                return JsonSchema.INT_SCHEMA;
+            case LONG:
+                return JsonSchema.LONG_SCHEMA;
+            case NULL:
+                throw new UnsupportedOperationException("null schema not supported");
+            case STRING:
+                return JsonSchema.STRING_SCHEMA;
+            case UNION: {
+                throw new UnsupportedOperationException("union schema not supported");
+            }
+            case ARRAY:
+                return JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME)
+                        .set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getElementType()));
+            case ENUM:
+                throw new UnsupportedOperationException("enum schema not supported");
+            case MAP:
+                throw new UnsupportedOperationException("map schema not supported");
+            default:
+                throw new CopycatException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
+        }
+    }
+
+
+    private static Schema asCopycatSchema(JsonNode jsonSchema) {
+        if (jsonSchema.isNull())
+            return null;
+
+        JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
+        if (schemaTypeNode == null || !schemaTypeNode.isTextual())
+            throw new CopycatException("Schema must contain 'type' field");
+
+        switch (schemaTypeNode.textValue()) {
+            case JsonSchema.BOOLEAN_TYPE_NAME:
+                return SchemaBuilder.builder().booleanType();
+            case JsonSchema.INT_TYPE_NAME:
+                return SchemaBuilder.builder().intType();
+            case JsonSchema.LONG_TYPE_NAME:
+                return SchemaBuilder.builder().longType();
+            case JsonSchema.FLOAT_TYPE_NAME:
+                return SchemaBuilder.builder().floatType();
+            case JsonSchema.DOUBLE_TYPE_NAME:
+                return SchemaBuilder.builder().doubleType();
+            case JsonSchema.BYTES_TYPE_NAME:
+                return SchemaBuilder.builder().bytesType();
+            case JsonSchema.STRING_TYPE_NAME:
+                return SchemaBuilder.builder().stringType();
+            case JsonSchema.ARRAY_TYPE_NAME:
+                JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
+                if (elemSchema == null)
+                    throw new CopycatException("Array schema did not specify the element type");
+                return Schema.createArray(asCopycatSchema(elemSchema));
+            default:
+                throw new CopycatException("Unknown schema type: " + schemaTypeNode.textValue());
+        }
+    }
+
+
+    /**
+     * Convert this object, in org.apache.kafka.copycat.data format, into a JSON object with an envelope object
+     * containing schema and payload fields.
+     * @param value
+     * @return
+     */
+    private static JsonNode convertToJsonWithSchemaEnvelope(Object value) {
+        return convertToJson(value).toJsonNode();
+    }
+
+    /**
+     * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
+     * and the converted object.
+     */
+    private static JsonSchema.Envelope convertToJson(Object value) {
+        if (value == null) {
+            return JsonSchema.nullEnvelope();
+        } else if (value instanceof Boolean) {
+            return JsonSchema.booleanEnvelope((Boolean) value);
+        } else if (value instanceof Byte) {
+            return JsonSchema.intEnvelope((Byte) value);
+        } else if (value instanceof Short) {
+            return JsonSchema.intEnvelope((Short) value);
+        } else if (value instanceof Integer) {
+            return JsonSchema.intEnvelope((Integer) value);
+        } else if (value instanceof Long) {
+            return JsonSchema.longEnvelope((Long) value);
+        } else if (value instanceof Float) {
+            return JsonSchema.floatEnvelope((Float) value);
+        } else if (value instanceof Double) {
+            return JsonSchema.doubleEnvelope((Double) value);
+        } else if (value instanceof byte[]) {
+            return JsonSchema.bytesEnvelope((byte[]) value);
+        } else if (value instanceof ByteBuffer) {
+            return JsonSchema.bytesEnvelope(((ByteBuffer) value).array());
+        } else if (value instanceof CharSequence) {
+            return JsonSchema.stringEnvelope(value.toString());
+        } else if (value instanceof Collection) {
+            Collection collection = (Collection) value;
+            ObjectNode schema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
+            ArrayNode list = JsonNodeFactory.instance.arrayNode();
+            JsonNode itemSchema = null;
+            for (Object elem : collection) {
+                JsonSchema.Envelope fieldSchemaAndValue = convertToJson(elem);
+                if (itemSchema == null) {
+                    itemSchema = fieldSchemaAndValue.schema;
+                    schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
+                } else {
+                    if (!itemSchema.equals(fieldSchemaAndValue.schema))
+                        throw new CopycatException("Mismatching schemas found in a list.");
+                }
+
+                list.add(fieldSchemaAndValue.payload);
+            }
+            return new JsonSchema.Envelope(schema, list);
+        }
+
+        throw new CopycatException("Couldn't convert " + value + " to Avro.");
+    }
+
+
+    private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
+        if (jsonSchema.isNull())
+            return null;
+
+        JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
+        if (schemaTypeNode == null || !schemaTypeNode.isTextual())
+            throw new CopycatException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
+
+        JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
+        if (typeConverter != null)
+            return typeConverter.convert(jsonSchema, jsonValue);
+
+        throw new CopycatException("Unknown schema type: " + schemaTypeNode);
+    }
+
+
+    private interface JsonToCopycatTypeConverter {
+        Object convert(JsonNode schema, JsonNode value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
new file mode 100644
index 0000000..29c7bac
--- /dev/null
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+
+/**
+ * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
+ * structured data without having associated Java classes. This deserializer also supports Copycat schemas.
+ */
+public class JsonDeserializer implements Deserializer<JsonNode> {
+    private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode();
+    private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode();
+    private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode();
+    private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode();
+    static {
+        CATCH_ALL_OBJECT_SCHEMA.put("type", "object")
+                .putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all"));
+
+        CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all");
+
+        ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string")
+                .add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA);
+
+        CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST);
+    }
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonDeserializer() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> props, boolean isKey) {
+    }
+
+    @Override
+    public JsonNode deserialize(String topic, byte[] bytes) {
+        JsonNode data;
+        try {
+            data = objectMapper.readTree(bytes);
+        } catch (Exception e) {
+            throw new SerializationException(e);
+        }
+
+        // The deserialized data should either be an envelope object containing the schema and the payload or the schema
+        // was stripped during serialization and we need to fill in an all-encompassing schema.
+        if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) {
+            ObjectNode envelope = JsonNodeFactory.instance.objectNode();
+            envelope.set("schema", CATCH_ALL_SCHEMA);
+            envelope.set("payload", data);
+            data = envelope;
+        }
+
+        return data;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
new file mode 100644
index 0000000..a807e0f
--- /dev/null
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.nio.ByteBuffer;
+
+public class JsonSchema {
+
+    static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema";
+    static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
+    static final String SCHEMA_TYPE_FIELD_NAME = "type";
+    static final String SCHEMA_NAME_FIELD_NAME = "name";
+    static final String ARRAY_ITEMS_FIELD_NAME = "items";
+    static final String BOOLEAN_TYPE_NAME = "boolean";
+    static final JsonNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
+    static final String INT_TYPE_NAME = "int";
+    static final JsonNode INT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT_TYPE_NAME);
+    static final String LONG_TYPE_NAME = "long";
+    static final JsonNode LONG_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, LONG_TYPE_NAME);
+    static final String FLOAT_TYPE_NAME = "float";
+    static final JsonNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME);
+    static final String DOUBLE_TYPE_NAME = "double";
+    static final JsonNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME);
+    static final String BYTES_TYPE_NAME = "bytes";
+    static final JsonNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME);
+    static final String STRING_TYPE_NAME = "string";
+    static final JsonNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
+    static final String ARRAY_TYPE_NAME = "array";
+
+    public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
+        ObjectNode result = JsonNodeFactory.instance.objectNode();
+        result.set(ENVELOPE_SCHEMA_FIELD_NAME, schema);
+        result.set(ENVELOPE_PAYLOAD_FIELD_NAME, payload);
+        return result;
+    }
+
+    static class Envelope {
+        public JsonNode schema;
+        public JsonNode payload;
+
+        public Envelope(JsonNode schema, JsonNode payload) {
+            this.schema = schema;
+            this.payload = payload;
+        }
+
+        public ObjectNode toJsonNode() {
+            return envelope(schema, payload);
+        }
+    }
+
+
+    public static Envelope nullEnvelope() {
+        return new Envelope(null, null);
+    }
+
+    public static Envelope booleanEnvelope(boolean value) {
+        return new Envelope(JsonSchema.BOOLEAN_SCHEMA, JsonNodeFactory.instance.booleanNode(value));
+    }
+
+    public static Envelope intEnvelope(byte value) {
+        return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
+    }
+
+    public static Envelope intEnvelope(short value) {
+        return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
+    }
+
+    public static Envelope intEnvelope(int value) {
+        return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
+    }
+
+    public static Envelope longEnvelope(long value) {
+        return new Envelope(JsonSchema.LONG_SCHEMA, JsonNodeFactory.instance.numberNode(value));
+    }
+
+    public static Envelope floatEnvelope(float value) {
+        return new Envelope(JsonSchema.FLOAT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
+    }
+
+    public static Envelope doubleEnvelope(double value) {
+        return new Envelope(JsonSchema.DOUBLE_SCHEMA, JsonNodeFactory.instance.numberNode(value));
+    }
+
+    public static Envelope bytesEnvelope(byte[] value) {
+        return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value));
+    }
+
+    public static Envelope bytesEnvelope(ByteBuffer value) {
+        return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value.array()));
+    }
+
+    public static Envelope stringEnvelope(CharSequence value) {
+        return new Envelope(JsonSchema.STRING_SCHEMA, JsonNodeFactory.instance.textNode(value.toString()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
new file mode 100644
index 0000000..dcac270
--- /dev/null
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+/**
+ * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
+ * structured data without corresponding Java classes. This serializer also supports Copycat schemas.
+ */
+public class JsonSerializer implements Serializer<JsonNode> {
+
+    private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
+    private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonSerializer() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> config, boolean isKey) {
+        Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG);
+        if (enableConfigsVal != null)
+            enableSchemas = enableConfigsVal.toString().equals("true");
+    }
+
+    @Override
+    public byte[] serialize(String topic, JsonNode data) {
+        // This serializer works for Copycat data that requires a schema to be included, so we expect it to have a
+        // specific format: { "schema": {...}, "payload": ... }.
+        if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload"))
+            throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields");
+
+        try {
+            if (!enableSchemas)
+                data = data.get("payload");
+            return objectMapper.writeValueAsBytes(data);
+        } catch (Exception e) {
+            throw new SerializationException("Error serializing JSON message", e);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
new file mode 100644
index 0000000..1a725c9
--- /dev/null
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JsonConverterTest {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    JsonConverter converter = new JsonConverter();
+
+    @Test
+    public void booleanToCopycat() {
+        assertEquals(true, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
+        assertEquals(false, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
+    }
+
+    @Test
+    public void intToCopycat() {
+        assertEquals(12, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int\" }, \"payload\": 12 }")));
+    }
+
+    @Test
+    public void longToCopycat() {
+        assertEquals(12L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 12 }")));
+        assertEquals(4398046511104L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 4398046511104 }")));
+    }
+
+    @Test
+    public void floatToCopycat() {
+        assertEquals(12.34f, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
+    }
+
+    @Test
+    public void doubleToCopycat() {
+        assertEquals(12.34, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
+    }
+
+
+    @Test
+    public void bytesToCopycat() throws UnsupportedEncodingException {
+        ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
+        String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
+        ByteBuffer converted = ByteBuffer.wrap((byte[]) converter.toCopycatData(parse(msg)));
+        assertEquals(reference, converted);
+    }
+
+    @Test
+    public void stringToCopycat() {
+        assertEquals("foo-bar-baz", converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
+    }
+
+    @Test
+    public void arrayToCopycat() {
+        JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int\" } }, \"payload\": [1, 2, 3] }");
+        assertEquals(Arrays.asList(1, 2, 3), converter.toCopycatData(arrayJson));
+    }
+
+
+    @Test
+    public void booleanToJson() {
+        JsonNode converted = converter.fromCopycatData(true);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"boolean\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+    }
+
+    @Test
+    public void intToJson() {
+        JsonNode converted = converter.fromCopycatData(12);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
+    }
+
+    @Test
+    public void longToJson() {
+        JsonNode converted = converter.fromCopycatData(4398046511104L);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"long\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
+    }
+
+    @Test
+    public void floatToJson() {
+        JsonNode converted = converter.fromCopycatData(12.34f);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"float\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
+    }
+
+    @Test
+    public void doubleToJson() {
+        JsonNode converted = converter.fromCopycatData(12.34);
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"double\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
+    }
+
+    @Test
+    public void bytesToJson() throws IOException {
+        JsonNode converted = converter.fromCopycatData("test-string".getBytes());
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"bytes\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(ByteBuffer.wrap("test-string".getBytes()),
+                ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue()));
+    }
+
+    @Test
+    public void stringToJson() {
+        JsonNode converted = converter.fromCopycatData("test-string");
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"string\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
+    }
+
+    @Test
+    public void arrayToJson() {
+        JsonNode converted = converter.fromCopycatData(Arrays.asList(1, 2, 3));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int\" } }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3),
+                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+
+
+    private JsonNode parse(String json) {
+        try {
+            return objectMapper.readTree(json);
+        } catch (IOException e) {
+            fail("IOException during JSON parse: " + e.getMessage());
+            throw new RuntimeException("failed");
+        }
+    }
+
+    private void validateEnvelope(JsonNode env) {
+        assertNotNull(env);
+        assertTrue(env.isObject());
+        assertEquals(2, env.size());
+        assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
+        assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
new file mode 100644
index 0000000..130a529
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.cli;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * <p>
+ * Command line utility that runs Copycat as a standalone process. In this mode, work is not
+ * distributed. Instead, all the normal Copycat machinery works within a single process. This is
+ * useful for ad hoc, small, or experimental jobs.
+ * </p>
+ * <p>
+ * By default, no job configs or offset data is persistent. You can make jobs persistent and
+ * fault tolerant by overriding the settings to use file storage for both.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class CopycatStandalone {
+    private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class);
+
+    public static void main(String[] args) throws Exception {
+        Properties workerProps;
+        Properties connectorProps;
+
+        if (args.length < 2) {
+            log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]");
+            System.exit(1);
+        }
+
+        String workerPropsFile = args[0];
+        workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
+
+        WorkerConfig workerConfig = new WorkerConfig(workerProps);
+        Worker worker = new Worker(workerConfig);
+        Herder herder = new StandaloneHerder(worker);
+        final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, herder);
+        copycat.start();
+
+        try {
+            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
+                connectorProps = Utils.loadProps(connectorPropsFile);
+                FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
+                    @Override
+                    public void onCompletion(Throwable error, String id) {
+                        if (error != null)
+                            log.error("Failed to create job for {}", connectorPropsFile);
+                    }
+                });
+                herder.addConnector(connectorProps, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            copycat.stop();
+        }
+
+        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+        copycat.awaitStop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
new file mode 100644
index 0000000..46229db
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.cli;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Configuration for standalone workers.
+ */
+@InterfaceStability.Unstable
+public class WorkerConfig extends AbstractConfig {
+
+    public static final String CLUSTER_CONFIG = "cluster";
+    private static final String
+            CLUSTER_CONFIG_DOC =
+            "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters "
+                    + "or instances may co-exist while sharing a single Kafka cluster.";
+    public static final String CLUSTER_DEFAULT = "copycat";
+
+    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+    public static final String BOOTSTRAP_SERVERS_DOC
+            = "A list of host/port pairs to use for establishing the initial connection to the Kafka "
+            + "cluster. The client will make use of all servers irrespective of which servers are "
+            + "specified here for bootstrapping&mdash;this list only impacts the initial hosts used "
+            + "to discover the full set of servers. This list should be in the form "
+            + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the "
+            + "initial connection to discover the full cluster membership (which may change "
+            + "dynamically), this list need not contain the full set of servers (you may want more "
+            + "than one, though, in case a server is down).";
+    public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+
+    public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
+    public static final String KEY_CONVERTER_CLASS_DOC =
+            "Converter class for key Copycat data that implements the <code>Converter</code> interface.";
+
+    public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
+    public static final String VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for value Copycat data that implements the <code>Converter</code> interface.";
+
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
+    public static final String KEY_SERIALIZER_CLASS_DOC =
+            "Serializer class for key that implements the <code>Serializer</code> interface.";
+
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
+    public static final String VALUE_SERIALIZER_CLASS_DOC =
+            "Serializer class for value that implements the <code>Serializer</code> interface.";
+
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
+    public static final String KEY_DESERIALIZER_CLASS_DOC =
+            "Serializer class for key that implements the <code>Deserializer</code> interface.";
+
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
+    public static final String VALUE_DESERIALIZER_CLASS_DOC =
+            "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+
+    public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
+            = "task.shutdown.graceful.timeout.ms";
+    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
+            "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time,"
+                    + " not per task. All task have shutdown triggered, then they are waited on sequentially.";
+    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000";
+
+    public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms";
+    private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
+            = "Interval at which to try committing offsets for tasks.";
+    public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L;
+
+    public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
+    private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
+            = "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
+            + " committed to offset storage before cancelling the process and restoring the offset "
+            + "data to be committed in a future attempt.";
+    public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
+
+    private static ConfigDef config;
+
+    static {
+        config = new ConfigDef()
+                .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
+                .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
+                        Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
+                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
+                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
+                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+                .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
+                .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
+                .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
+                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
+                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
+                .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
+                        Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
+                .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
+                        Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
+    }
+
+    public WorkerConfig() {
+        this(new Properties());
+    }
+
+    public WorkerConfig(Properties props) {
+        super(config, props);
+    }
+
+    public Properties getUnusedProperties() {
+        Set<String> unusedKeys = this.unused();
+        Properties unusedProps = new Properties();
+        for (String key : unusedKeys) {
+            unusedProps.put(key, this.originals().get(key));
+        }
+        return unusedProps;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
new file mode 100644
index 0000000..e3fcc1c
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * <p>
+ * Configuration options for Connectors. These only include Copycat system-level configuration
+ * options (e.g. Connector class name, timeouts used by Copycat to control the connector) but does
+ * not include Connector-specific options (e.g. database connection settings).
+ * </p>
+ * <p>
+ * Note that some of these options are not required for all connectors. For example TOPICS_CONFIG
+ * is sink-specific.
+ * </p>
+ */
+public class ConnectorConfig extends AbstractConfig {
+
+    public static final String NAME_CONFIG = "name";
+    private static final String NAME_DOC = "Globally unique name to use for this connector.";
+
+    public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
+    private static final String CONNECTOR_CLASS_DOC =
+            "Name of the class for this connector. Must be a subclass of org.apache.kafka.copycat.connector"
+                    + ".Connector";
+
+    public static final String TASKS_MAX_CONFIG = "tasks.max";
+    private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
+    public static final int TASKS_MAX_DEFAULT = 1;
+
+    public static final String TOPICS_CONFIG = "topics";
+    private static final String TOPICS_DOC = "";
+    public static final String TOPICS_DEFAULT = "";
+
+    private static ConfigDef config;
+
+    static {
+        config = new ConfigDef()
+                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
+                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
+                .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
+    }
+
+    private Properties originalProperties;
+
+    public ConnectorConfig() {
+        this(new Properties());
+    }
+
+    public ConnectorConfig(Properties props) {
+        super(config, props);
+        this.originalProperties = props;
+    }
+
+    public Properties getUnusedProperties() {
+        Set<String> unusedKeys = this.unused();
+        Properties unusedProps = new Properties();
+        for (String key : unusedKeys) {
+            unusedProps.setProperty(key, originalProperties.getProperty(key));
+        }
+        return unusedProps;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
new file mode 100644
index 0000000..e8dfe14
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class ties together all the components of a Copycat process (herder, worker,
+ * storage, command interface), managing their lifecycle.
+ */
+@InterfaceStability.Unstable
+public class Copycat {
+    private static final Logger log = LoggerFactory.getLogger(Copycat.class);
+
+    private final Worker worker;
+    private final Herder herder;
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final ShutdownHook shutdownHook;
+
+    public Copycat(Worker worker, Herder herder) {
+        log.debug("Copycat created");
+        this.worker = worker;
+        this.herder = herder;
+        shutdownHook = new ShutdownHook();
+    }
+
+    public void start() {
+        log.info("Copycat starting");
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+        worker.start();
+        herder.start();
+
+        log.info("Copycat started");
+
+        startLatch.countDown();
+    }
+
+    public void stop() {
+        boolean wasShuttingDown = shutdown.getAndSet(true);
+        if (!wasShuttingDown) {
+            log.info("Copycat stopping");
+
+            herder.stop();
+            worker.stop();
+
+            log.info("Copycat stopped");
+        }
+
+        stopLatch.countDown();
+    }
+
+    public void awaitStop() {
+        try {
+            stopLatch.await();
+        } catch (InterruptedException e) {
+            log.error("Interrupted waiting for Copycat to shutdown");
+        }
+    }
+
+    private class ShutdownHook extends Thread {
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+                Copycat.this.stop();
+            } catch (InterruptedException e) {
+                log.error("Interrupted in shutdown hook while waiting for copycat startup to finish");
+            }
+        }
+    }
+}


Mime
View raw message