kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/3] kafka git commit: KAFKA-3093: Add Connect status tracking API
Date Wed, 24 Feb 2016 06:47:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk aeb9c2adc -> f7d019ed4


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3eab095..abb62b9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -426,7 +426,7 @@ public class WorkerCoordinatorTest {
         public int assignedCount = 0;
 
         @Override
-        public void onAssigned(ConnectProtocol.Assignment assignment) {
+        public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
             this.assignment = assignment;
             assignedCount++;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 8ef1d24..07d0e3d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -33,6 +36,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
@@ -54,8 +58,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -64,22 +68,25 @@ public class StandaloneHerderTest {
     private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final int DEFAULT_MAX_TASKS = 1;
+    private static final String WORKER_ID = "localhost:8083";
 
     private StandaloneHerder herder;
-    @Mock protected Worker worker;
+
     private Connector connector;
+    @Mock protected Worker worker;
     @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
+    @Mock protected StatusBackingStore statusBackingStore;
 
     @Before
     public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        herder = new StandaloneHerder(worker);
+        herder = new StandaloneHerder(WORKER_ID, worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class),
false, createCallback);
@@ -121,6 +128,10 @@ public class StandaloneHerderTest {
     public void testDestroyConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
+        EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
+        statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED,
WORKER_ID, 0));
+
         expectDestroy();
 
         PowerMock.replayAll();
@@ -232,7 +243,8 @@ public class StandaloneHerderTest {
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
         Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
-        worker.addConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         EasyMock.expectLastCall();
         // Generate same task config, which should result in no additional action to restart
tasks
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
@@ -270,16 +282,19 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
-    private void expectAdd(String name, Class<? extends Connector> connClass, Class<?
extends Task> taskClass,
+    private void expectAdd(String name,
+                           Class<? extends Connector> connClass,
+                           Class<? extends Task> taskClass,
                            boolean sink) throws Exception {
         Map<String, String> connectorProps = connectorConfig(name, connClass);
 
-        worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class));
-        PowerMock.expectLastCall();
+        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
+                EasyMock.eq(herder));
+        EasyMock.expectLastCall();
 
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)));
         createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall();
 
         // And we should instantiate the tasks. For a sink task, we should see added properties
for
         // the input topic partitions
@@ -287,12 +302,15 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
                 .andReturn(Collections.singletonList(generatedTaskProps));
 
-        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps));
-        PowerMock.expectLastCall();
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps),
herder);
+        EasyMock.expectLastCall();
     }
 
     private void expectStop() {
-        worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
+        ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
+        worker.stopTasks(Collections.singleton(task));
+        EasyMock.expectLastCall();
+        worker.awaitStopTasks(Collections.singleton(task));
         EasyMock.expectLastCall();
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
new file mode 100644
index 0000000..cdbab64
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.easymock.Capture;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.junit.Assert.assertEquals;
+
+public class KafkaStatusBackingStoreTest extends EasyMockSupport {
+
+    private static final String STATUS_TOPIC = "status-topic";
+    private static final String WORKER_ID = "localhost:8083";
+    private static final String CONNECTOR = "conn";
+    private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
+
+    @Test
+    public void putConnectorState() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        return null;
+                    }
+                });
+        replayAll();
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING,
WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putConnectorStateRetriableFailure() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, new TimeoutException());
+                        return null;
+                    }
+                })
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        return null;
+                    }
+                });
+        replayAll();
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING,
WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putConnectorStateNonRetriableFailure() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, new UnknownServerException());
+                        return null;
+                    }
+                });
+        replayAll();
+
+        // the error is logged and ignored
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING,
WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putSafeConnectorIgnoresStaleStatus() {
+        byte[] value = new byte[0];
+        String otherWorkerId = "anotherhost:8083";
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        // the persisted came from a different host and has a newer generation
+        Map<String, Object> statusMap = new HashMap<>();
+        statusMap.put("worker_id", otherWorkerId);
+        statusMap.put("state", "RUNNING");
+        statusMap.put("generation", 1L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, statusMap));
+
+        // we're verifying that there is no call to KafkaBasedLog.send
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+        store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID,
0));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING,
otherWorkerId, 1);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putSafeOverridesValueSetBySameWorker() {
+        final byte[] value = new byte[0];
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(),
converter, STATUS_TOPIC, kafkaBasedLog);
+
+        // the persisted came from the same host, but has a newer generation
+        Map<String, Object> firstStatusRead = new HashMap<>();
+        firstStatusRead.put("worker_id", WORKER_ID);
+        firstStatusRead.put("state", "RUNNING");
+        firstStatusRead.put("generation", 1L);
+
+        Map<String, Object> secondStatusRead = new HashMap<>();
+        secondStatusRead.put("worker_id", WORKER_ID);
+        secondStatusRead.put("state", "UNASSIGNED");
+        secondStatusRead.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, firstStatusRead))
+                .andReturn(new SchemaAndValue(null, secondStatusRead));
+
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        store.read(consumerRecord(1, "status-connector-conn", value));
+                        return null;
+                    }
+                });
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+        store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID,
0));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED,
WORKER_ID, 0);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putConnectorStateShouldOverride() {
+        final byte[] value = new byte[0];
+        String otherWorkerId = "anotherhost:8083";
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(),
converter, STATUS_TOPIC, kafkaBasedLog);
+
+        // the persisted came from a different host and has a newer generation
+        Map<String, Object> firstStatusRead = new HashMap<>();
+        firstStatusRead.put("worker_id", otherWorkerId);
+        firstStatusRead.put("state", "RUNNING");
+        firstStatusRead.put("generation", 1L);
+
+        Map<String, Object> secondStatusRead = new HashMap<>();
+        secondStatusRead.put("worker_id", WORKER_ID);
+        secondStatusRead.put("state", "UNASSIGNED");
+        secondStatusRead.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, firstStatusRead))
+                .andReturn(new SchemaAndValue(null, secondStatusRead));
+
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        store.read(consumerRecord(1, "status-connector-conn", value));
+                        return null;
+                    }
+                });
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED,
WORKER_ID, 0);
+        store.put(status);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void readConnectorState() {
+        byte[] value = new byte[0];
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        Map<String, Object> statusMap = new HashMap<>();
+        statusMap.put("worker_id", WORKER_ID);
+        statusMap.put("state", "RUNNING");
+        statusMap.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, statusMap));
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING,
WORKER_ID, 0);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putTaskState() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        return null;
+                    }
+                });
+        replayAll();
+
+        TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(TASK));
+
+        verifyAll();
+    }
+
+    @Test
+    public void readTaskState() {
+        byte[] value = new byte[0];
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter,
STATUS_TOPIC, kafkaBasedLog);
+
+        Map<String, Object> statusMap = new HashMap<>();
+        statusMap.put("worker_id", WORKER_ID);
+        statusMap.put("state", "RUNNING");
+        statusMap.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, statusMap));
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-task-conn-0", value));
+
+        TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+        assertEquals(status, store.get(TASK));
+
+        verifyAll();
+    }
+
+    private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String
key, byte[] value) {
+        return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
+                TimestampType.CREATE_TIME, key, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
new file mode 100644
index 0000000..40aee37
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class MemoryStatusBackingStoreTest {
+
+    @Test
+    public void putAndGetConnectorStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        ConnectorStatus status = new ConnectorStatus("connector", ConnectorStatus.State.RUNNING,
"localhost:8083", 0);
+        store.put(status);
+        assertEquals(status, store.get("connector"));
+    }
+
+    @Test
+    public void putAndGetTaskStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+        TaskStatus status = new TaskStatus(taskId, ConnectorStatus.State.RUNNING, "localhost:8083",
0);
+        store.put(status);
+        assertEquals(status, store.get(taskId));
+        assertEquals(Collections.singleton(status), store.getAll("connector"));
+    }
+
+    @Test
+    public void deleteConnectorStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        store.put(new ConnectorStatus("connector", ConnectorStatus.State.RUNNING, "localhost:8083",
0));
+        store.put(new ConnectorStatus("connector", ConnectorStatus.State.DESTROYED, "localhost:8083",
0));
+        assertNull(store.get("connector"));
+    }
+
+    @Test
+    public void deleteTaskStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+        store.put(new TaskStatus(taskId, ConnectorStatus.State.RUNNING, "localhost:8083",
0));
+        store.put(new TaskStatus(taskId, ConnectorStatus.State.DESTROYED, "localhost:8083",
0));
+        assertNull(store.get(taskId));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
new file mode 100644
index 0000000..ee266b5
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TableTest {
+
+    @Test
+    public void basicOperations() {
+        Table<String, Integer, String> table = new Table<>();
+        table.put("foo", 5, "bar");
+        table.put("foo", 6, "baz");
+        assertEquals("bar", table.get("foo", 5));
+        assertEquals("baz", table.get("foo", 6));
+
+        Map<Integer, String> row = table.row("foo");
+        assertEquals("bar", row.get(5));
+        assertEquals("baz", row.get(6));
+
+        assertEquals("bar", table.remove("foo", 5));
+        assertNull(table.get("foo", 5));
+        assertEquals("baz", table.remove("foo", 6));
+        assertNull(table.get("foo", 6));
+        assertTrue(table.row("foo").isEmpty());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index a6e902f..76336e1 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -181,10 +181,12 @@ class ConnectStandaloneService(ConnectServiceBase):
 class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
-    def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
configs_topic="connect-configs"):
+    def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
+                 configs_topic="connect-configs", status_topic="connect-status"):
         super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
+        self.status_topic = status_topic
 
     def start_cmd(self, node):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
index 1f82e63..9aa16ab 100644
--- a/tests/kafkatest/tests/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect_distributed_test.py
@@ -33,6 +33,7 @@ class ConnectDistributedTest(KafkaTest):
     TOPIC = "test"
     OFFSETS_TOPIC = "connect-offsets"
     CONFIG_TOPIC = "connect-configs"
+    STATUS_TOPIC = "connect-status"
 
     # Since tasks can be assigned to any node and we're testing with files, we need to make
sure the content is the same
     # across all nodes.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_rest_test.py b/tests/kafkatest/tests/connect_rest_test.py
index 8e713d4..69a8cb7 100644
--- a/tests/kafkatest/tests/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect_rest_test.py
@@ -30,6 +30,7 @@ class ConnectRestApiTest(KafkaTest):
     TOPIC = "test"
     OFFSETS_TOPIC = "connect-offsets"
     CONFIG_TOPIC = "connect-configs"
+    STATUS_TOPIC = "connect-status"
 
     # Since tasks can be assigned to any node and we're testing with files, we need to make
sure the content is the same
     # across all nodes.
@@ -160,4 +161,5 @@ class ConnectRestApiTest(KafkaTest):
             return []
 
     def _config_dict_from_props(self, connector_props):
-        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if
line.strip() and not line.strip().startswith('#')])
\ No newline at end of file
+        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if
line.strip() and not line.strip().startswith('#')])
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties
index 8a9f6c7..7a7440a 100644
--- a/tests/kafkatest/tests/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/templates/connect-distributed.properties
@@ -33,6 +33,7 @@ internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
 config.storage.topic={{ CONFIG_TOPIC }}
+status.storage.topic={{ STATUS_TOPIC }}
 
 # Make sure data gets flushed frequently so tests don't have to wait to ensure they see data
in output systems
 offset.flush.interval.ms=5000


Mime
View raw message