kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/5] kafka git commit: KAFKA-2367; Add Copycat runtime data API.
Date Thu, 27 Aug 2015 18:58:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8c88d198a -> 492bfdfa8


http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index 7a050dc..237eda6 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,17 +57,17 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader
{
     }
 
     @Override
-    public Object getOffset(Object partition) {
-        return getOffsets(Arrays.asList(partition)).get(partition);
+    public SchemaAndValue offset(SchemaAndValue partition) {
+        return offsets(Arrays.asList(partition)).get(partition);
     }
 
     @Override
-    public Map<Object, Object> getOffsets(Collection<Object> partitions) {
+    public Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue>
partitions) {
         // Serialize keys so backing store can work with them
-        Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
-        for (Object key : partitions) {
+        Map<ByteBuffer, SchemaAndValue> serializedToOriginal = new HashMap<>(partitions.size());
+        for (SchemaAndValue key : partitions) {
             try {
-                byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key));
+                byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key.schema(),
key.value()));
                 ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized)
: null;
                 serializedToOriginal.put(keyBuffer, key);
             } catch (Throwable t) {
@@ -86,7 +87,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader
{
         }
 
         // Deserialize all the values and map back to the original keys
-        Map<Object, Object> result = new HashMap<>(partitions.size());
+        Map<SchemaAndValue, SchemaAndValue> result = new HashMap<>(partitions.size());
         for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
             try {
                 // Since null could be a valid key, explicitly check whether map contains
the key
@@ -95,8 +96,8 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader
{
                             + "store may have returned invalid data", rawEntry.getKey());
                     continue;
                 }
-                Object origKey = serializedToOriginal.get(rawEntry.getKey());
-                Object deserializedValue = valueConverter.toCopycatData(
+                SchemaAndValue origKey = serializedToOriginal.get(rawEntry.getKey());
+                SchemaAndValue deserializedValue = valueConverter.toCopycatData(
                         valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
                 );
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
index c6e829c..4fb75e7 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
 import org.slf4j.Logger;
@@ -73,10 +74,10 @@ public class OffsetStorageWriter<K, V> {
     private final Serializer<V> valueSerializer;
     private final String namespace;
     // Offset data in Copycat format
-    private Map<Object, Object> data = new HashMap<>();
+    private Map<SchemaAndValue, SchemaAndValue> data = new HashMap<>();
 
     // Not synchronized, should only be accessed by flush thread
-    private Map<Object, Object> toFlush = null;
+    private Map<SchemaAndValue, SchemaAndValue> toFlush = null;
     // Unique ID for each flush request to handle callbacks after timeouts
     private long currentFlushId = 0;
 
@@ -96,7 +97,7 @@ public class OffsetStorageWriter<K, V> {
      * @param partition the partition to store an offset for
      * @param offset the offset
      */
-    public synchronized void setOffset(Object partition, Object offset) {
+    public synchronized void offset(SchemaAndValue partition, SchemaAndValue offset) {
         data.put(partition, offset);
     }
 
@@ -141,10 +142,10 @@ public class OffsetStorageWriter<K, V> {
         Map<ByteBuffer, ByteBuffer> offsetsSerialized;
         try {
             offsetsSerialized = new HashMap<>();
-            for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
-                byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey()));
+            for (Map.Entry<SchemaAndValue, SchemaAndValue> entry : toFlush.entrySet())
{
+                byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey().schema(),
entry.getKey().value()));
                 ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
-                byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue()));
+                byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue().schema(),
entry.getValue().value()));
                 ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
                 offsetsSerialized.put(keyBuffer, valueBuffer);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
index 44a9e41..683c634 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -32,11 +32,11 @@ public class ConnectorTaskId implements Serializable {
         this.task = task;
     }
 
-    public String getConnector() {
+    public String connector() {
         return connector;
     }
 
-    public int getTask() {
+    public int task() {
         return task;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
index 278fdd3..db9f2c4 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
@@ -57,17 +57,17 @@ public class FutureCallback<T> implements Callback<T>, Future<T>
{
     @Override
     public T get() throws InterruptedException, ExecutionException {
         finishedLatch.await();
-        return getResult();
+        return result();
     }
 
     @Override
     public T get(long l, TimeUnit timeUnit)
             throws InterruptedException, ExecutionException, TimeoutException {
         finishedLatch.await(l, timeUnit);
-        return getResult();
+        return result();
     }
 
-    private T getResult() throws ExecutionException {
+    private T result() throws ExecutionException {
         if (exception != null) {
             throw new ExecutionException(exception);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index e5286e3..54e9bc6 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -52,7 +54,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     private static final String TOPIC = "test";
     private static final int PARTITION = 12;
     private static final long FIRST_OFFSET = 45;
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
     private static final int KEY = 12;
+    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
     private static final String VALUE = "VALUE";
     private static final byte[] RAW_KEY = "key".getBytes();
     private static final byte[] RAW_VALUE = "value".getBytes();
@@ -118,7 +122,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
             assertEquals(1, recs.size());
             for (SinkRecord rec : recs) {
                 SinkRecord referenceSinkRecord
-                        = new SinkRecord(TOPIC, PARTITION, KEY, VALUE, FIRST_OFFSET + offset);
+                        = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA,
VALUE, FIRST_OFFSET + offset);
                 assertEquals(referenceSinkRecord, rec);
                 offset++;
             }
@@ -130,7 +134,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     @Test
     public void testDeliverConvertsData() throws Exception {
         // Validate conversion is performed when data is delivered
-        Integer record = 12;
+        SchemaAndValue record = new SchemaAndValue(Schema.INT32_SCHEMA, 12);
 
         ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                 Collections.singletonMap(
@@ -148,8 +152,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         Whitebox.invokeMethod(workerTask, "deliverMessages", records);
-        assertEquals(record, capturedRecords.getValue().iterator().next().getKey());
-        assertEquals(record, capturedRecords.getValue().iterator().next().getValue());
+        assertEquals(record.schema(), capturedRecords.getValue().iterator().next().keySchema());
+        assertEquals(record.value(), capturedRecords.getValue().iterator().next().key());
+        assertEquals(record.schema(), capturedRecords.getValue().iterator().next().valueSchema());
+        assertEquals(record.value(), capturedRecords.getValue().iterator().next().value());
 
         PowerMock.verifyAll();
     }
@@ -173,7 +179,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         // Second triggers commit, gets a second offset
         workerThread.iteration();
         // Commit finishes synchronously for testing so we can check this immediately
-        assertEquals(0, workerThread.getCommitFailures());
+        assertEquals(0, workerThread.commitFailures());
         workerTask.stop();
         workerTask.close();
 
@@ -198,7 +204,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         // Second iteration triggers commit
         workerThread.iteration();
         workerThread.iteration();
-        assertEquals(1, workerThread.getCommitFailures());
+        assertEquals(1, workerThread.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
         workerTask.stop();
         workerTask.close();
@@ -223,7 +229,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         workerThread.iteration();
         workerThread.iteration();
         // TODO Response to consistent failures?
-        assertEquals(1, workerThread.getCommitFailures());
+        assertEquals(1, workerThread.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
         workerTask.stop();
         workerTask.close();
@@ -252,7 +258,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         workerThread.iteration();
         workerThread.iteration();
         // TODO Response to consistent failures?
-        assertEquals(1, workerThread.getCommitFailures());
+        assertEquals(1, workerThread.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
         workerTask.stop();
         workerTask.close();
@@ -314,8 +320,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
                         return records;
                     }
                 });
-        EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
-        EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
+        EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA,
KEY)).anyTimes();
+        EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA,
VALUE)).anyTimes();
         Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
         sinkTask.put(EasyMock.capture(capturedRecords));
         EasyMock.expectLastCall().anyTimes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 60e1462..018aa94 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -57,11 +59,15 @@ import static org.junit.Assert.*;
 
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
+    private static final Schema PARTITION_SCHEMA = Schema.BYTES_SCHEMA;
     private static final byte[] PARTITION_BYTES = "partition".getBytes();
+    private static final Schema OFFSET_SCHEMA = Schema.BYTES_SCHEMA;
     private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
 
     // Copycat-format data
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
     private static final Integer KEY = -1;
+    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
     private static final Long RECORD = 12L;
     // Native-formatted data. The actual format of this data doesn't matter -- we just want
to see that the right version
     // is used in the right place.
@@ -83,7 +89,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private static final Properties EMPTY_TASK_PROPS = new Properties();
     private static final List<SourceRecord> RECORDS = Arrays.asList(
-            new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)
+            new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES,
"topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
     );
 
     @Override
@@ -195,7 +201,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         List<SourceRecord> records = new ArrayList<>();
         // Can just use the same record for key and value
-        records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
+        records.add(new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES,
"topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
 
         Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
 
@@ -228,8 +234,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws
InterruptedException {
-        EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
-        EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
+        EasyMock.expect(keyConverter.fromCopycatData(KEY_SCHEMA, KEY)).andStubReturn(CONVERTED_KEY);
+        EasyMock.expect(valueConverter.fromCopycatData(RECORD_SCHEMA, RECORD)).andStubReturn(CONVERTED_RECORD);
 
         Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
         // 1. Converted data passed to the producer, which will need callbacks invoked for
flush to work
@@ -249,7 +255,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                     }
                 });
         // 2. Offset data is passed to the offset storage.
-        offsetWriter.setOffset(PARTITION_BYTES, OFFSET_BYTES);
+        offsetWriter.offset(new SchemaAndValue(PARTITION_SCHEMA, PARTITION_BYTES), new SchemaAndValue(OFFSET_SCHEMA,
OFFSET_BYTES));
         PowerMock.expectLastCall().anyTimes();
 
         return sent;

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
index 5ac7e38..477893b 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -145,9 +145,9 @@ public class StandaloneHerderTest {
         PowerMock.expectLastCall();
 
         // Just return the connector properties for the individual task we generate by default
-        EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
+        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(taskClass);
 
-        EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
+        EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
                 .andReturn(Arrays.asList(taskProps));
         // And we should instantiate the tasks. For a sink task, we should see added properties
for
         // the input topic partitions

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
index 3d49f05..9c0c52d 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -18,6 +18,9 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.SchemaBuilder;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
 import org.easymock.Capture;
@@ -45,7 +48,9 @@ import static org.junit.Assert.assertTrue;
 public class OffsetStorageWriterTest {
     private static final String NAMESPACE = "namespace";
     // Copycat format - any types should be accepted here
+    private static final Schema OFFSET_KEY_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
     private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
+    private static final Schema OFFSET_VALUE_SCHEMA = Schema.STRING_SCHEMA;
     private static final String OFFSET_VALUE = "value";
     // Native objects - must match serializer types
     private static final int OFFSET_KEY_CONVERTED = 12;
@@ -87,7 +92,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA,
OFFSET_VALUE));
 
         assertTrue(writer.beginFlush());
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
@@ -123,7 +128,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA,
OFFSET_VALUE));
         assertTrue(writer.beginFlush());
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         assertTrue(writer.beginFlush());
@@ -143,7 +148,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA,
OFFSET_VALUE));
         assertTrue(writer.beginFlush());
         writer.doFlush(callback);
         assertTrue(writer.beginFlush()); // should throw
@@ -155,7 +160,7 @@ public class OffsetStorageWriterTest {
     public void testCancelBeforeAwaitFlush() {
         PowerMock.replayAll();
 
-        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA,
OFFSET_VALUE));
         assertTrue(writer.beginFlush());
         writer.cancelFlush();
 
@@ -173,7 +178,7 @@ public class OffsetStorageWriterTest {
 
         PowerMock.replayAll();
 
-        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA,
OFFSET_VALUE));
         assertTrue(writer.beginFlush());
         // Start the flush, then immediately cancel before allowing the mocked store request
to finish
         Future<Void> flushFuture = writer.doFlush(callback);
@@ -202,9 +207,9 @@ public class OffsetStorageWriterTest {
     private void expectStore(final Callback<Void> callback,
                              final boolean fail,
                              final CountDownLatch waitForCompletion) {
-        EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
+        EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
         EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
-        EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
+        EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
         EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
 
         final Capture<Callback<Void>> storeCallback = Capture.newInstance();

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 27ae98f..9c7fea5 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
 
 apply from: file('scala.gradle')
 include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients',
'tools', 'log4j-appender',
-        'copycat:data', 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'
+        'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'


Mime
View raw message