kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-5900: Add task metrics common to both sink and source tasks
Date Wed, 27 Sep 2017 05:23:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8256f882c -> 73cc41666


http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
deleted file mode 100644
index a717492..0000000
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectorMetrics.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.connect.util.MockTime;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class MockConnectorMetrics extends ConnectMetrics {
-
-    private static final Map<String, String> DEFAULT_WORKER_CONFIG = new HashMap<>();
-    static {
-        DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-    }
-
-    public MockConnectorMetrics() {
-        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG),
new MockTime());
-    }
-
-    @Override
-    public MockTime time() {
-        return (MockTime) super.time();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java
new file mode 100644
index 0000000..7423854
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.AbstractStatus.State;
+import org.apache.kafka.connect.util.MockTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StateTrackerTest {
+
+    private static final double DELTA = 0.000001d;
+
+    private StateTracker tracker;
+    private MockTime time;
+    private State state;
+
+    @Before
+    public void setUp() {
+        time = new MockTime();
+        time.sleep(1000L);
+        tracker = new StateTracker();
+        state = State.UNASSIGNED;
+    }
+
+    @Test
+    public void currentStateIsNullWhenNotInitialized() {
+        assertNull(tracker.currentState());
+    }
+
+    @Test
+    public void currentState() {
+        for (State state : State.values()) {
+            tracker.changeState(state, time.milliseconds());
+            assertEquals(state, tracker.currentState());
+        }
+    }
+
+    @Test
+    public void calculateDurations() {
+        tracker.changeState(State.UNASSIGNED, time.milliseconds());
+        time.sleep(1000L);
+        assertEquals(1.0d, tracker.durationRatio(State.UNASSIGNED, time.milliseconds()),
DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.RUNNING, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.PAUSED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.FAILED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.DESTROYED, time.milliseconds()), DELTA);
+
+        tracker.changeState(State.RUNNING, time.milliseconds());
+        time.sleep(3000L);
+        assertEquals(0.25d, tracker.durationRatio(State.UNASSIGNED, time.milliseconds()),
DELTA);
+        assertEquals(0.75d, tracker.durationRatio(State.RUNNING, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.PAUSED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.FAILED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.DESTROYED, time.milliseconds()), DELTA);
+
+        tracker.changeState(State.PAUSED, time.milliseconds());
+        time.sleep(4000L);
+        assertEquals(0.125d, tracker.durationRatio(State.UNASSIGNED, time.milliseconds()),
DELTA);
+        assertEquals(0.375d, tracker.durationRatio(State.RUNNING, time.milliseconds()), DELTA);
+        assertEquals(0.500d, tracker.durationRatio(State.PAUSED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.FAILED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.DESTROYED, time.milliseconds()), DELTA);
+
+        tracker.changeState(State.RUNNING, time.milliseconds());
+        time.sleep(8000L);
+        assertEquals(0.0625d, tracker.durationRatio(State.UNASSIGNED, time.milliseconds()),
DELTA);
+        assertEquals(0.6875d, tracker.durationRatio(State.RUNNING, time.milliseconds()),
DELTA);
+        assertEquals(0.2500d, tracker.durationRatio(State.PAUSED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.FAILED, time.milliseconds()), DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.DESTROYED, time.milliseconds()), DELTA);
+
+        tracker.changeState(State.FAILED, time.milliseconds());
+        time.sleep(16000L);
+        assertEquals(0.03125d, tracker.durationRatio(State.UNASSIGNED, time.milliseconds()),
DELTA);
+        assertEquals(0.34375d, tracker.durationRatio(State.RUNNING, time.milliseconds()),
DELTA);
+        assertEquals(0.12500d, tracker.durationRatio(State.PAUSED, time.milliseconds()),
DELTA);
+        assertEquals(0.50000d, tracker.durationRatio(State.FAILED, time.milliseconds()),
DELTA);
+        assertEquals(0.0d, tracker.durationRatio(State.DESTROYED, time.milliseconds()), DELTA);
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 2101a33..5f03f5a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -55,7 +55,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
     @Before
     public void setup() {
         connectorConfig = new ConnectorConfig(plugins, CONFIG);
-        metrics = new MockConnectorMetrics();
+        metrics = new MockConnectMetrics();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index a680688..524c022 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -42,6 +42,7 @@ import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -112,6 +113,7 @@ public class WorkerSinkTaskTest {
     private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
+    private ConnectMetrics metrics;
     @Mock
     private PluginClassLoader pluginLoader;
     @Mock
@@ -142,19 +144,25 @@ public class WorkerSinkTaskTest {
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerConfig = new StandaloneConfig(workerProps);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        metrics = new MockConnectMetrics();
+        recordsReturnedTp1 = 0;
+        recordsReturnedTp3 = 0;
+    }
+
+    private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter,
valueConverter, transformationChain, pluginLoader, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter,
valueConverter, transformationChain, pluginLoader, time);
+    }
 
-        recordsReturnedTp1 = 0;
-        recordsReturnedTp3 = 0;
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
     }
 
     @Test
     public void testStartPaused() throws Exception {
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter,
valueConverter, transformationChain, pluginLoader, time);
+        createTask(TargetState.PAUSED);
 
         expectInitializeTask();
         expectPollInitialAssignment();
@@ -175,6 +183,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testPause() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
         expectPollInitialAssignment();
 
@@ -233,6 +243,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testPollRedelivery() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
         expectPollInitialAssignment();
 
@@ -274,6 +286,8 @@ public class WorkerSinkTaskTest {
     public void testErrorInRebalancePartitionRevocation() throws Exception {
         RuntimeException exception = new RuntimeException("Revocation error");
 
+        createTask(initialState);
+
         expectInitializeTask();
         expectPollInitialAssignment();
         expectRebalanceRevocationError(exception);
@@ -297,6 +311,8 @@ public class WorkerSinkTaskTest {
     public void testErrorInRebalancePartitionAssignment() throws Exception {
         RuntimeException exception = new RuntimeException("Assignment error");
 
+        createTask(initialState);
+
         expectInitializeTask();
         expectPollInitialAssignment();
         expectRebalanceAssignmentError(exception);
@@ -318,6 +334,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testWakeupInCommitSyncCausesRetry() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         expectPollInitialAssignment();
@@ -386,6 +404,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testRequestCommit() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         expectPollInitialAssignment();
@@ -437,6 +457,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testPreCommit() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         // iter 1
@@ -502,6 +524,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testIgnoredCommit() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         // iter 1
@@ -550,6 +574,8 @@ public class WorkerSinkTaskTest {
     // when there is a long running commit in process. See KAFKA-4942 for more information.
     @Test
     public void testLongRunningCommitWithoutTimeout() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         // iter 1
@@ -647,6 +673,8 @@ public class WorkerSinkTaskTest {
     // See KAFKA-5731 for more information.
     @Test
     public void testCommitWithOutOfOrderCallback() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         // iter 1
@@ -827,6 +855,8 @@ public class WorkerSinkTaskTest {
 
     @Test
     public void testDeliveryWithMutatingTransform() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
 
         expectPollInitialAssignment();
@@ -872,12 +902,15 @@ public class WorkerSinkTaskTest {
         assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been
cleared
         assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask,
"lastCommittedOffsets"));
         assertEquals(0, workerTask.commitFailures());
+        assertEquals(1.0, workerTask.taskMetricsGroup().currentMetricValue("batch-size-max"),
0.0001);
 
         PowerMock.verifyAll();
     }
 
     @Test
     public void testMissingTimestampPropagation() throws Exception {
+        createTask(initialState);
+
         expectInitializeTask();
         expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
         expectConversionAndTransformation(1);
@@ -906,6 +939,8 @@ public class WorkerSinkTaskTest {
         final Long timestamp = System.currentTimeMillis();
         final TimestampType timestampType = TimestampType.CREATE_TIME;
 
+        createTask(initialState);
+
         expectInitializeTask();
         expectConsumerPoll(1, timestamp, timestampType);
         expectConversionAndTransformation(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 6f77f65..b7d21d5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -42,6 +42,7 @@ import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -99,6 +100,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private TargetState initialState = TargetState.STARTED;
     private Time time;
+    private ConnectMetrics metrics;
     @Mock private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
@@ -119,6 +121,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     public void setup() {
         super.setup();
         time = new MockTime();
+        metrics = new MockConnectMetrics();
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
@@ -131,12 +134,17 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter,
-                valueConverter, TransformationChain.<SinkRecord>noOp(), pluginLoader,
time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter,
+                valueConverter, TransformationChain.noOp(), pluginLoader, time);
 
         recordsReturned = 0;
     }
 
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
     @Test
     public void testPollsInBackground() throws Exception {
         expectInitializeTask();

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index a3ddb3e..cf19522 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -38,11 +38,13 @@ import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
@@ -64,6 +66,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+@PowerMockIgnore("javax.management.*")
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
     private static final String TOPIC = "topic";
@@ -84,6 +87,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private WorkerConfig config;
     private Plugins plugins;
+    private ConnectMetrics metrics;
     @Mock private SourceTask sourceTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
@@ -121,6 +125,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         plugins = new Plugins(workerProps);
         config = new StandaloneConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
+        metrics = new MockConnectMetrics();
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
     }
 
     private void createWorkerTask() {
@@ -129,7 +139,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState,
keyConverter, valueConverter, transformationChain,
-                producer, offsetReader, offsetWriter, config, plugins.delegatingLoader(),
Time.SYSTEM);
+                producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(),
Time.SYSTEM);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 871c887..e55c7fa 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.MockTime;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -30,6 +35,7 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.partialMockBuilder;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
 
 public class WorkerTaskTest {
 
@@ -39,6 +45,18 @@ public class WorkerTaskTest {
     }
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
+    private ConnectMetrics metrics;
+
+    @Before
+    public void setup() {
+        metrics = new MockConnectMetrics();
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
     @Test
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
@@ -51,9 +69,10 @@ public class WorkerTaskTest {
                         ConnectorTaskId.class,
                         TaskStatus.Listener.class,
                         TargetState.class,
-                        ClassLoader.class
+                        ClassLoader.class,
+                        ConnectMetrics.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -71,6 +90,9 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
+        workerTask.releaseResources();
+        EasyMock.expectLastCall();
+
         statusListener.onShutdown(taskId);
         expectLastCall();
 
@@ -96,9 +118,10 @@ public class WorkerTaskTest {
                         ConnectorTaskId.class,
                         TaskStatus.Listener.class,
                         TargetState.class,
-                        ClassLoader.class
+                        ClassLoader.class,
+                        ConnectMetrics.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -110,6 +133,9 @@ public class WorkerTaskTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
+        workerTask.releaseResources();
+        EasyMock.expectLastCall();
+
         replay(workerTask);
 
         workerTask.initialize(TASK_CONFIG);
@@ -134,9 +160,10 @@ public class WorkerTaskTest {
                         ConnectorTaskId.class,
                         TaskStatus.Listener.class,
                         TargetState.class,
-                        ClassLoader.class
+                        ClassLoader.class,
+                        ConnectMetrics.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -171,6 +198,9 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
+        workerTask.releaseResources();
+        EasyMock.expectLastCall();
+
         // there should be no call to onShutdown()
 
         replay(workerTask);
@@ -186,7 +216,124 @@ public class WorkerTaskTest {
         verify(workerTask);
     }
 
+    @Test
+    public void updateMetricsOnListenerEventsForStartupPauseResumeAndShutdown() {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+        ConnectMetrics metrics = new MockConnectMetrics();
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
+
+        statusListener.onStartup(taskId);
+        expectLastCall();
+
+        statusListener.onPause(taskId);
+        expectLastCall();
+
+        statusListener.onResume(taskId);
+        expectLastCall();
+
+        statusListener.onShutdown(taskId);
+        expectLastCall();
+
+        replay(statusListener);
+
+        group.onStartup(taskId);
+        assertRunningMetric(group);
+        group.onPause(taskId);
+        assertPausedMetric(group);
+        group.onResume(taskId);
+        assertRunningMetric(group);
+        group.onShutdown(taskId);
+        assertStoppedMetric(group);
+
+        verify(statusListener);
+    }
+
+    @Test
+    public void updateMetricsOnListenerEventsForStartupPauseResumeAndFailure() {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+        MockConnectMetrics metrics = new MockConnectMetrics();
+        MockTime time = metrics.time();
+        ConnectException error = new ConnectException("error");
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+        TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
+
+        statusListener.onStartup(taskId);
+        expectLastCall();
+
+        statusListener.onPause(taskId);
+        expectLastCall();
+
+        statusListener.onResume(taskId);
+        expectLastCall();
+
+        statusListener.onPause(taskId);
+        expectLastCall();
+
+        statusListener.onResume(taskId);
+        expectLastCall();
+
+        statusListener.onFailure(taskId, error);
+        expectLastCall();
+
+        statusListener.onShutdown(taskId);
+        expectLastCall();
+
+        replay(statusListener);
+
+        time.sleep(1000L);
+        group.onStartup(taskId);
+        assertRunningMetric(group);
+
+        time.sleep(2000L);
+        group.onPause(taskId);
+        assertPausedMetric(group);
+
+        time.sleep(3000L);
+        group.onResume(taskId);
+        assertRunningMetric(group);
+
+        time.sleep(4000L);
+        group.onPause(taskId);
+        assertPausedMetric(group);
+
+        time.sleep(5000L);
+        group.onResume(taskId);
+        assertRunningMetric(group);
+
+        time.sleep(6000L);
+        group.onFailure(taskId, error);
+        assertFailedMetric(group);
+
+        time.sleep(7000L);
+        group.onShutdown(taskId);
+        assertStoppedMetric(group);
+
+        verify(statusListener);
+
+        long totalTime = 27000L;
+        double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
+        double runningTimeRatio = (double) (2000L + 4000L + 6000L) / totalTime;
+        assertEquals(pauseTimeRatio, group.currentMetricValue("pause-ratio"), 0.000001d);
+        assertEquals(runningTimeRatio, group.currentMetricValue("running-ratio"), 0.000001d);
+    }
+
     private static abstract class TestSinkTask extends SinkTask {
     }
 
+    protected void assertFailedMetric(TaskMetricsGroup metricsGroup) {
+        assertEquals(AbstractStatus.State.FAILED, metricsGroup.state());
+    }
+
+    protected void assertPausedMetric(TaskMetricsGroup metricsGroup) {
+        assertEquals(AbstractStatus.State.PAUSED, metricsGroup.state());
+    }
+
+    protected void assertRunningMetric(TaskMetricsGroup metricsGroup) {
+        assertEquals(AbstractStatus.State.RUNNING, metricsGroup.state());
+    }
+
+    protected void assertStoppedMetric(TaskMetricsGroup metricsGroup) {
+        assertEquals(AbstractStatus.State.UNASSIGNED, metricsGroup.state());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 91b07be..f823b09 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.connect.util.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -75,6 +76,7 @@ public class WorkerTest extends ThreadedTest {
 
     private WorkerConfig config;
     private Worker worker;
+    private ConnectMetrics metrics;
 
     @Mock
     private Plugins plugins;
@@ -102,10 +104,16 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         config = new StandaloneConfig(workerProps);
+        metrics = new MockConnectMetrics();
 
         PowerMock.mockStatic(Plugins.class);
     }
 
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
     @Test
     public void testStartAndStopConnector() throws Exception {
         expectConverters();
@@ -450,6 +458,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
+                EasyMock.anyObject(ConnectMetrics.class),
                 EasyMock.anyObject(ClassLoader.class),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
@@ -572,6 +581,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
@@ -653,6 +663,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
                 EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);


Mime
View raw message