kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [12/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
Date Mon, 09 Nov 2015 06:11:31 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
deleted file mode 100644
index 90651ed..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Task;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
- * addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush}
- * to support offset commits.
- */
-@InterfaceStability.Unstable
-public abstract class SinkTask implements Task {
-
-    /**
-     * <p>
-     * The configuration key that provides the list of topics that are inputs for this
-     * SinkTask.
-     * </p>
-     */
-    public static final String TOPICS_CONFIG = "topics";
-
-    protected SinkTaskContext context;
-
-    public void initialize(SinkTaskContext context) {
-        this.context = context;
-    }
-
-    /**
-     * Start the Task. This should handle any configuration parsing and one-time setup of the task.
-     * @param props initial configuration
-     */
-    @Override
-    public abstract void start(Map<String, String> props);
-
-    /**
-     * Put the records in the sink. Usually this should send the records to the sink asynchronously
-     * and immediately return.
-     *
-     * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.copycat.errors.RetriableException} to
-     * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
-     * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the
-     * batch will be retried.
-     *
-     * @param records the set of records to send
-     */
-    public abstract void put(Collection<SinkRecord> records);
-
-    /**
-     * Flush all records that have been {@link #put} for the specified topic-partitions. The
-     * offsets are provided for convenience, but could also be determined by tracking all offsets
-     * included in the SinkRecords passed to {@link #put}.
-     *
-     * @param offsets mapping of TopicPartition to committed offset
-     */
-    public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
-
-    /**
-     * The SinkTask use this method to create writers for newly assigned partitions in case of partition
-     * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask.
-     * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions.
-     * This method will be called after partition re-assignment completes and before the SinkTask starts
-     * fetching data.
-     * @param partitions The list of partitions that are now assigned to the task (may include
-     *                   partitions previously assigned to the task)
-     */
-    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-    }
-
-    /**
-     * The SinkTask use this method to close writers and commit offsets for partitions that are
-     * longer assigned to the SinkTask. This method will be called before a rebalance operation starts
-     * and after the SinkTask stops fetching data.
-     * @param partitions The list of partitions that were assigned to the consumer on the last
-     *                   rebalance
-     */
-    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-    }
-
-    /**
-     * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other
-     * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset
-     * commit has completed. Implementations of this method should only need to perform final cleanup operations, such
-     * as closing network connections to the sink system.
-     */
-    public abstract void stop();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
deleted file mode 100644
index 763b9a4..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
- */
-@InterfaceStability.Unstable
-public interface SinkTaskContext {
-    /**
-     * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets
-     * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
-     * offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task
-     * would reload offsets from HDFS and use this method to reset the consumer to those offsets.
-     *
-     * SinkTasks that do not manage their own offsets do not need to use this method.
-     *
-     * @param offsets map of offsets for topic partitions
-     */
-    void offset(Map<TopicPartition, Long> offsets);
-
-    /**
-     * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets
-     * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
-     * offsets in HDFS to provide exactly once delivery. When the topic partition is recovered the task
-     * would reload offsets from HDFS and use this method to reset the consumer to the offset.
-     *
-     * SinkTasks that do not manage their own offsets do not need to use this method.
-     *
-     * @param tp the topic partition to reset offset.
-     * @param offset the offset to reset to.
-     */
-    void offset(TopicPartition tp, long offset);
-
-    /**
-     * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain
-     * operations after the timeout. SinkTasks may have certain operations on external systems that may need
-     * to retry in case of failures. For example, append a record to an HDFS file may fail due to temporary network
-     * issues. SinkTasks use this method to set how long to wait before retrying.
-     * @param timeoutMs the backoff timeout in milliseconds.
-     */
-    void timeout(long timeoutMs);
-
-    /**
-     * Get the current set of assigned TopicPartitions for this task.
-     * @return the set of currently assigned TopicPartitions
-     */
-    Set<TopicPartition> assignment();
-
-    /**
-     * Pause consumption of messages from the specified TopicPartitions.
-     * @param partitions the partitions which should be paused
-     */
-    void pause(TopicPartition... partitions);
-
-    /**
-     * Resume consumption of messages from previously paused TopicPartitions.
-     * @param partitions the partitions to resume
-     */
-    void resume(TopicPartition... partitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
deleted file mode 100644
index 7258cdf..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Connector;
-
-/**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
- */
-@InterfaceStability.Unstable
-public abstract class SourceConnector extends Connector {
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
deleted file mode 100644
index 7f54c10..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.CopycatRecord;
-import org.apache.kafka.copycat.data.Schema;
-
-import java.util.Map;
-
-/**
- * <p>
- * SourceRecords are generated by SourceTasks and passed to Copycat for storage in
- * Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored
- * in Kafka, they also include a sourcePartition and sourceOffset.
- * </p>
- * <p>
- * The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table
- * name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used
- * to resume consumption of data.
- * </p>
- * <p>
- * These values can have arbitrary structure and should be represented using
- * org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector
- * might specify the sourcePartition as a record containing { "db": "database_name", "table":
- * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
- * </p>
- */
-@InterfaceStability.Unstable
-public class SourceRecord extends CopycatRecord {
-    private final Map<String, ?> sourcePartition;
-    private final Map<String, ?> sourceOffset;
-
-    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
-                        String topic, Integer partition, Schema valueSchema, Object value) {
-        this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value);
-    }
-
-    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
-                        String topic, Schema valueSchema, Object value) {
-        this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
-    }
-
-    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
-                        String topic, Integer partition,
-                        Schema keySchema, Object key, Schema valueSchema, Object value) {
-        super(topic, partition, keySchema, key, valueSchema, value);
-        this.sourcePartition = sourcePartition;
-        this.sourceOffset = sourceOffset;
-    }
-
-    public Map<String, ?> sourcePartition() {
-        return sourcePartition;
-    }
-
-    public Map<String, ?> sourceOffset() {
-        return sourceOffset;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        if (!super.equals(o))
-            return false;
-
-        SourceRecord that = (SourceRecord) o;
-
-        if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
-            return false;
-        if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
-            return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
-        result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "SourceRecord{" +
-                "sourcePartition=" + sourcePartition +
-                ", sourceOffset=" + sourceOffset +
-                "} " + super.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
deleted file mode 100644
index 841943f..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Task;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * SourceTask is a Task that pulls records from another system for storage in Kafka.
- */
-@InterfaceStability.Unstable
-public abstract class SourceTask implements Task {
-
-    protected SourceTaskContext context;
-
-    /**
-     * Initialize this SourceTask with the specified context object.
-     */
-    public void initialize(SourceTaskContext context) {
-        this.context = context;
-    }
-
-    /**
-     * Start the Task. This should handle any configuration parsing and one-time setup of the task.
-     * @param props initial configuration
-     */
-    @Override
-    public abstract void start(Map<String, String> props);
-
-    /**
-     * Poll this SourceTask for new records. This method should block if no data is currently
-     * available.
-     *
-     * @return a list of source records
-     */
-    public abstract List<SourceRecord> poll() throws InterruptedException;
-
-    /**
-     * <p>
-     * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
-     * method should block until the commit is complete.
-     * </p>
-     * <p>
-     * SourceTasks are not required to implement this functionality; Copycat will record offsets
-     * automatically. This hook is provided for systems that also need to store offsets internally
-     * in their own system.
-     * </p>
-     */
-    public void commit() throws InterruptedException {
-        // This space intentionally left blank.
-    }
-
-    /**
-     * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
-     * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has
-     * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and
-     * {@link #commit()}.
-     *
-     * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method
-     * could set a flag that will force {@link #poll()} to exit immediately and invoke
-     * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests.
-     */
-    public abstract void stop();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
deleted file mode 100644
index bc18c30..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.storage.OffsetStorageReader;
-
-/**
- * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
- * runtime.
- */
-@InterfaceStability.Unstable
-public interface SourceTaskContext {
-    /**
-     * Get the OffsetStorageReader for this SourceTask.
-     */
-    OffsetStorageReader offsetStorageReader();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
deleted file mode 100644
index d51b789..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-
-import java.util.Map;
-
-/**
- * The Converter interface provides support for translating between Copycat's runtime data format
- * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization
- * layer (e.g. JsonNode, GenericRecord, Message).
- */
-@InterfaceStability.Unstable
-public interface Converter {
-
-    /**
-     * Configure this class.
-     * @param configs configs in key/value pairs
-     * @param isKey whether is for key or value
-     */
-    void configure(Map<String, ?> configs, boolean isKey);
-
-    /**
-     * Convert a Copycat data object to a native object for serialization.
-     * @param topic the topic associated with the data
-     * @param schema the schema for the value
-     * @param value the value to convert
-     * @return
-     */
-    byte[] fromCopycatData(String topic, Schema schema, Object value);
-
-    /**
-     * Convert a native object to a Copycat data object.
-     * @param topic the topic associated with the data
-     * @param value the value to convert
-     * @return an object containing the {@link Schema} and the converted value
-     */
-    SchemaAndValue toCopycatData(String topic, byte[] value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
deleted file mode 100644
index 95d2c04..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * <p>
- * OffsetStorageReader provides access to the offset storage used by sources. This can be used by
- * connectors to determine offsets to start consuming data from. This is most commonly used during
- * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
- * </p>
- * <p>
- * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by
- * {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and Struct.
- * </p>
- */
-@InterfaceStability.Unstable
-public interface OffsetStorageReader {
-    /**
-     * Get the offset for the specified partition. If the data isn't already available locally, this
-     * gets it from the backing store, which may require some network round trips.
-     *
-     * @param partition object uniquely identifying the partition of data
-     * @return object uniquely identifying the offset in the partition of data
-     */
-    <T> Map<String, Object> offset(Map<String, T> partition);
-
-    /**
-     * <p>
-     * Get a set of offsets for the specified partition identifiers. This may be more efficient
-     * than calling {@link #offset(Map)} repeatedly.
-     * </p>
-     * <p>
-     * Note that when errors occur, this method omits the associated data and tries to return as
-     * many of the requested values as possible. This allows a task that's managing many partitions to
-     * still proceed with any available data. Therefore, implementations should take care to check
-     * that the data is actually available in the returned response. The only case when an
-     * exception will be thrown is if the entire request failed, e.g. because the underlying
-     * storage was unavailable.
-     * </p>
-     *
-     * @param partitions set of identifiers for partitions of data
-     * @return a map of partition identifiers to decoded offsets
-     */
-    <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
deleted file mode 100644
index 8d708f8..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * {@link Converter} implementation that only supports serializing to strings. When converting Copycat data to bytes,
- * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
- * When converting from bytes to Copycat format, the converter will only ever return an optional string schema and
- * a string or null.
- *
- * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience
- * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding
- * setting.
- */
-public class StringConverter implements Converter {
-    private final StringSerializer serializer = new StringSerializer();
-    private final StringDeserializer deserializer = new StringDeserializer();
-
-    public StringConverter() {
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        Map<String, Object> serializerConfigs = new HashMap<>();
-        serializerConfigs.putAll(configs);
-        Map<String, Object> deserializerConfigs = new HashMap<>();
-        deserializerConfigs.putAll(configs);
-
-        Object encodingValue = configs.get("converter.encoding");
-        if (encodingValue != null) {
-            serializerConfigs.put("serializer.encoding", encodingValue);
-            deserializerConfigs.put("deserializer.encoding", encodingValue);
-        }
-
-        serializer.configure(serializerConfigs, isKey);
-        deserializer.configure(deserializerConfigs, isKey);
-    }
-
-    @Override
-    public byte[] fromCopycatData(String topic, Schema schema, Object value) {
-        try {
-            return serializer.serialize(topic, value == null ? null : value.toString());
-        } catch (SerializationException e) {
-            throw new DataException("Failed to serialize to a string: ", e);
-        }
-    }
-
-    @Override
-    public SchemaAndValue toCopycatData(String topic, byte[] value) {
-        try {
-            return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value));
-        } catch (SerializationException e) {
-            throw new DataException("Failed to deserialize string: ", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
deleted file mode 100644
index f9dd53a..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utilities that connector implementations might find useful. Contains common building blocks
- * for writing connectors.
- */
-@InterfaceStability.Unstable
-public class ConnectorUtils {
-    /**
-     * Given a list of elements and a target number of groups, generates list of groups of
-     * elements to match the target number of groups, spreading them evenly among the groups.
-     * This generates groups with contiguous elements, which results in intuitive ordering if
-     * your elements are also ordered (e.g. alphabetical lists of table names if you sort
-     * table names alphabetically to generate the raw partitions) or can result in efficient
-     * partitioning if elements are sorted according to some criteria that affects performance
-     * (e.g. topic partitions with the same leader).
-     *
-     * @param elements list of elements to partition
-     * @param numGroups the number of output groups to generate.
-     */
-    public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
-        if (numGroups <= 0)
-            throw new IllegalArgumentException("Number of groups must be positive.");
-
-        List<List<T>> result = new ArrayList<>(numGroups);
-
-        // Each group has either n+1 or n raw partitions
-        int perGroup = elements.size() / numGroups;
-        int leftover = elements.size() - (numGroups * perGroup);
-
-        int assigned = 0;
-        for (int group = 0; group < numGroups; group++) {
-            int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
-            List<T> groupList = new ArrayList<>(numThisGroup);
-            for (int i = 0; i < numThisGroup; i++) {
-                groupList.add(elements.get(assigned));
-                assigned++;
-            }
-            result.add(groupList);
-        }
-
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
deleted file mode 100644
index 7b1e9eb..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class ConnectorReconfigurationTest {
-
-    @Test
-    public void testDefaultReconfigure() throws Exception {
-        TestConnector conn = new TestConnector(false);
-        conn.reconfigure(Collections.<String, String>emptyMap());
-        assertEquals(conn.stopOrder, 0);
-        assertEquals(conn.configureOrder, 1);
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testReconfigureStopException() throws Exception {
-        TestConnector conn = new TestConnector(true);
-        conn.reconfigure(Collections.<String, String>emptyMap());
-    }
-
-    private static class TestConnector extends Connector {
-        private boolean stopException;
-        private int order = 0;
-        public int stopOrder = -1;
-        public int configureOrder = -1;
-
-        public TestConnector(boolean stopException) {
-            this.stopException = stopException;
-        }
-
-        @Override
-        public String version() {
-            return "1.0";
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-            configureOrder = order++;
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return null;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int count) {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-            stopOrder = order++;
-            if (stopException)
-                throw new CopycatException("error");
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
deleted file mode 100644
index 4976950..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-
-public class CopycatSchemaTest {
-    private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
-    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("field", Schema.INT32_SCHEMA)
-            .build();
-    private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("first", Schema.INT32_SCHEMA)
-            .field("second", Schema.STRING_SCHEMA)
-            .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
-            .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
-            .field("nested", FLAT_STRUCT_SCHEMA)
-            .build();
-    private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("nested", FLAT_STRUCT_SCHEMA)
-            .build();
-
-    @Test
-    public void testFieldsOnStructSchema() {
-        Schema schema = SchemaBuilder.struct()
-                .field("foo", Schema.BOOLEAN_SCHEMA)
-                .field("bar", Schema.INT32_SCHEMA)
-                .build();
-
-        assertEquals(2, schema.fields().size());
-        // Validate field lookup by name
-        Field foo = schema.field("foo");
-        assertEquals(0, foo.index());
-        Field bar = schema.field("bar");
-        assertEquals(1, bar.index());
-        // Any other field name should fail
-        assertNull(schema.field("other"));
-    }
-
-
-    @Test(expected = DataException.class)
-    public void testFieldsOnlyValidForStructs() {
-        Schema.INT8_SCHEMA.fields();
-    }
-
-    @Test
-    public void testValidateValueMatchingType() {
-        CopycatSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
-        CopycatSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
-        CopycatSchema.validateValue(Schema.INT32_SCHEMA, 1);
-        CopycatSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
-        CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
-        CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
-        CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
-        CopycatSchema.validateValue(Schema.STRING_SCHEMA, "a string");
-        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes());
-        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes()));
-        CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3));
-        CopycatSchema.validateValue(
-                SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(),
-                Collections.singletonMap(1, "value")
-        );
-        // Struct tests the basic struct layout + complex field types + nested structs
-        Struct structValue = new Struct(STRUCT_SCHEMA)
-                .put("first", 1)
-                .put("second", "foo")
-                .put("array", Arrays.asList(1, 2, 3))
-                .put("map", Collections.singletonMap(1, "value"))
-                .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12));
-        CopycatSchema.validateValue(STRUCT_SCHEMA, structValue);
-    }
-
-    @Test
-    public void testValidateValueMatchingLogicalType() {
-        CopycatSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2));
-        CopycatSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
-        CopycatSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
-        CopycatSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
-    }
-
-    // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible
-    // to only include a single test for each type
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt8() {
-        CopycatSchema.validateValue(Schema.INT8_SCHEMA, 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt16() {
-        CopycatSchema.validateValue(Schema.INT16_SCHEMA, 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt32() {
-        CopycatSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt64() {
-        CopycatSchema.validateValue(Schema.INT64_SCHEMA, 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchFloat() {
-        CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchDouble() {
-        CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchBoolean() {
-        CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchString() {
-        // CharSequence is a similar type (supertype of String), but we restrict to String.
-        CharBuffer cbuf = CharBuffer.wrap("abc");
-        CopycatSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchBytes() {
-        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"});
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchArray() {
-        CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchArraySomeMatch() {
-        // Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses
-        // the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element
-        CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapKey() {
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapValue() {
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapSomeKeys() {
-        Map<Object, String> data = new HashMap<>();
-        data.put(1, "abc");
-        data.put("wrong", "it's as easy as one two three");
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapSomeValues() {
-        Map<Integer, Object> data = new HashMap<>();
-        data.put(1, "abc");
-        data.put(2, "wrong".getBytes());
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchStructWrongSchema() {
-        // Completely mismatching schemas
-        CopycatSchema.validateValue(
-                FLAT_STRUCT_SCHEMA,
-                new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)
-        );
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchStructWrongNestedSchema() {
-        // Top-level schema  matches, but nested does not.
-        CopycatSchema.validateValue(
-                PARENT_STRUCT_SCHEMA,
-                new Struct(PARENT_STRUCT_SCHEMA)
-                        .put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1))
-        );
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchDecimal() {
-        CopycatSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchDate() {
-        CopycatSchema.validateValue(Date.SCHEMA, 1000L);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchTime() {
-        CopycatSchema.validateValue(Time.SCHEMA, 1000L);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchTimestamp() {
-        CopycatSchema.validateValue(Timestamp.SCHEMA, 1000L);
-    }
-
-    @Test
-    public void testPrimitiveEquality() {
-        // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
-        CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, false, null, "name", 2, "doc");
-        CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, true, null, "name", 2, "doc");
-        CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, false, true, "name", 2, "doc");
-        CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc");
-        CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, false, null, "name", 4, "doc");
-        CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "other doc");
-        CopycatSchema differentParameters = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", Collections.singletonMap("param", "value"), null, null, null);
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentType);
-        assertNotEquals(s1, differentOptional);
-        assertNotEquals(s1, differentDefault);
-        assertNotEquals(s1, differentName);
-        assertNotEquals(s1, differentVersion);
-        assertNotEquals(s1, differentDoc);
-        assertNotEquals(s1, differentParameters);
-    }
-
-    @Test
-    public void testArrayEquality() {
-        // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is
-        // never reused to ensure we're actually checking equality
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
-        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int16().build());
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentValueSchema);
-    }
-
-    @Test
-    public void testMapEquality() {
-        // Same as testArrayEquality, but for both key and value schemas
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
-        CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
-        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentKeySchema);
-        assertNotEquals(s1, differentValueSchema);
-    }
-
-    @Test
-    public void testStructEquality() {
-        // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of
-        // Field's equals() method to validate all variations in the list of fields will be checked
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
-                Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
-                        new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
-                Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
-                        new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
-        CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
-                Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
-                        new Field("different field name", 1, SchemaBuilder.int16().build())), null, null);
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentField);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
deleted file mode 100644
index e7885ab..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.TimeZone;
-
-import static org.junit.Assert.assertEquals;
-
-public class DateTest {
-    private static final GregorianCalendar EPOCH;
-    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
-    private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT;
-    static {
-        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-        EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 1);
-        EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-        EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
-        EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
-    }
-
-    @Test
-    public void testBuilder() {
-        Schema plain = Date.SCHEMA;
-        assertEquals(Date.LOGICAL_NAME, plain.name());
-        assertEquals(1, (Object) plain.version());
-    }
-
-    @Test
-    public void testFromLogical() {
-        assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime()));
-        assertEquals(10000, Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime()));
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidSchema() {
-        Date.fromLogical(Date.builder().name("invalid").build(), EPOCH.getTime());
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidHasTimeComponents() {
-        Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime());
-    }
-
-    @Test
-    public void testToLogical() {
-        assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0));
-        assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), Date.toLogical(Date.SCHEMA, 10000));
-    }
-
-    @Test(expected = DataException.class)
-    public void testToLogicalInvalidSchema() {
-        Date.toLogical(Date.builder().name("invalid").build(), 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
deleted file mode 100644
index ce71161..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collections;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class DecimalTest {
-    private static final int TEST_SCALE = 2;
-    private static final BigDecimal TEST_DECIMAL = new BigDecimal(new BigInteger("156"), TEST_SCALE);
-    private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new BigInteger("-156"), TEST_SCALE);
-    private static final byte[] TEST_BYTES = new byte[]{0, -100};
-    private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100};
-
-    @Test
-    public void testBuilder() {
-        Schema plain = Decimal.builder(2).build();
-        assertEquals(Decimal.LOGICAL_NAME, plain.name());
-        assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), plain.parameters());
-        assertEquals(1, (Object) plain.version());
-    }
-
-    @Test
-    public void testFromLogical() {
-        Schema schema = Decimal.schema(TEST_SCALE);
-        byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL);
-        assertArrayEquals(TEST_BYTES, encoded);
-
-        encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE);
-        assertArrayEquals(TEST_BYTES_NEGATIVE, encoded);
-    }
-
-    @Test
-    public void testToLogical() {
-        Schema schema = Decimal.schema(2);
-        BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES);
-        assertEquals(TEST_DECIMAL, converted);
-
-        converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE);
-        assertEquals(TEST_DECIMAL_NEGATIVE, converted);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
deleted file mode 100644
index d5458bc..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class FieldTest {
-
-    @Test
-    public void testEquality() {
-        Field field1 = new Field("name", 0, Schema.INT8_SCHEMA);
-        Field field2 = new Field("name", 0, Schema.INT8_SCHEMA);
-        Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA);
-        Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA);
-        Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA);
-
-        assertEquals(field1, field2);
-        assertNotEquals(field1, differentName);
-        assertNotEquals(field1, differentIndex);
-        assertNotEquals(field1, differentSchema);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
deleted file mode 100644
index 183f5fc..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.SchemaBuilderException;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class SchemaBuilderTest {
-    private static final String NAME = "name";
-    private static final Integer VERSION = 2;
-    private static final String DOC = "doc";
-    private static final Map<String, String> NO_PARAMS = null;
-
-    @Test
-    public void testInt8Builder() {
-        Schema schema = SchemaBuilder.int8().build();
-        assertTypeAndDefault(schema, Schema.Type.INT8, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt8BuilderInvalidDefault() {
-        SchemaBuilder.int8().defaultValue("invalid");
-    }
-
-    @Test
-    public void testInt16Builder() {
-        Schema schema = SchemaBuilder.int16().build();
-        assertTypeAndDefault(schema, Schema.Type.INT16, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt16BuilderInvalidDefault() {
-        SchemaBuilder.int16().defaultValue("invalid");
-    }
-
-    @Test
-    public void testInt32Builder() {
-        Schema schema = SchemaBuilder.int32().build();
-        assertTypeAndDefault(schema, Schema.Type.INT32, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt32BuilderInvalidDefault() {
-        SchemaBuilder.int32().defaultValue("invalid");
-    }
-
-    @Test
-    public void testInt64Builder() {
-        Schema schema = SchemaBuilder.int64().build();
-        assertTypeAndDefault(schema, Schema.Type.INT64, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt64BuilderInvalidDefault() {
-        SchemaBuilder.int64().defaultValue("invalid");
-    }
-
-    @Test
-    public void testFloatBuilder() {
-        Schema schema = SchemaBuilder.float32().build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testFloatBuilderInvalidDefault() {
-        SchemaBuilder.float32().defaultValue("invalid");
-    }
-
-    @Test
-    public void testDoubleBuilder() {
-        Schema schema = SchemaBuilder.float64().build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testDoubleBuilderInvalidDefault() {
-        SchemaBuilder.float64().defaultValue("invalid");
-    }
-
-    @Test
-    public void testBooleanBuilder() {
-        Schema schema = SchemaBuilder.bool().build();
-        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testBooleanBuilderInvalidDefault() {
-        SchemaBuilder.bool().defaultValue("invalid");
-    }
-
-    @Test
-    public void testStringBuilder() {
-        Schema schema = SchemaBuilder.string().build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string")
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string");
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testStringBuilderInvalidDefault() {
-        SchemaBuilder.string().defaultValue(true);
-    }
-
-    @Test
-    public void testBytesBuilder() {
-        Schema schema = SchemaBuilder.bytes().build();
-        assertTypeAndDefault(schema, Schema.Type.BYTES, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes())
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes());
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testBytesBuilderInvalidDefault() {
-        SchemaBuilder.bytes().defaultValue("a string, not bytes");
-    }
-
-
-    @Test
-    public void testParameters() {
-        Map<String, String> expectedParameters = new HashMap<>();
-        expectedParameters.put("foo", "val");
-        expectedParameters.put("bar", "baz");
-
-        Schema schema = SchemaBuilder.string().parameter("foo", "val").parameter("bar", "baz").build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
-        assertMetadata(schema, null, null, null, expectedParameters);
-
-        schema = SchemaBuilder.string().parameters(expectedParameters).build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
-        assertMetadata(schema, null, null, null, expectedParameters);
-    }
-
-
-    @Test
-    public void testStructBuilder() {
-        Schema schema = SchemaBuilder.struct()
-                .field("field1", Schema.INT8_SCHEMA)
-                .field("field2", Schema.INT8_SCHEMA)
-                .build();
-        assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null);
-        assertEquals(2, schema.fields().size());
-        assertEquals("field1", schema.fields().get(0).name());
-        assertEquals(0, schema.fields().get(0).index());
-        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema());
-        assertEquals("field2", schema.fields().get(1).name());
-        assertEquals(1, schema.fields().get(1).index());
-        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema());
-        assertNoMetadata(schema);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testNonStructCantHaveFields() {
-        SchemaBuilder.int8().field("field", SchemaBuilder.int8().build());
-    }
-
-
-    @Test
-    public void testArrayBuilder() {
-        Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
-        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-
-        // Default value
-        List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2);
-        schema = SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build();
-        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testArrayBuilderInvalidDefault() {
-        // Array, but wrong embedded type
-        SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build();
-    }
-
-    @Test
-    public void testMapBuilder() {
-        Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build();
-        assertTypeAndDefault(schema, Schema.Type.MAP, false, null);
-        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-
-        // Default value
-        Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10);
-        schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
-                .defaultValue(defMap).build();
-        assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap);
-        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testMapBuilderInvalidDefault() {
-        // Map, but wrong embedded type
-        Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo");
-        SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
-                .defaultValue(defMap).build();
-    }
-
-
-
-    private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) {
-        assertEquals(type, schema.type());
-        assertEquals(optional, schema.isOptional());
-        if (type == Schema.Type.BYTES) {
-            // byte[] is not comparable, need to wrap to check correctly
-            if (defaultValue == null)
-                assertNull(schema.defaultValue());
-            else
-                assertEquals(ByteBuffer.wrap((byte[]) defaultValue), ByteBuffer.wrap((byte[]) schema.defaultValue()));
-        } else {
-            assertEquals(defaultValue, schema.defaultValue());
-        }
-    }
-
-    private void assertMetadata(Schema schema, String name, Integer version, String doc, Map<String, String> parameters) {
-        assertEquals(name, schema.name());
-        assertEquals(version, schema.version());
-        assertEquals(doc, schema.doc());
-        assertEquals(parameters, schema.parameters());
-    }
-
-    private void assertNoMetadata(Schema schema) {
-        assertMetadata(schema, null, null, null, null);
-    }
-}


Mime
View raw message