kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-5903: Added Connect metrics to the worker and distributed herder (KIP-196)
Date Thu, 05 Oct 2017 18:23:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 5e2767a26 -> 1d026269e


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f823b09..80c65df 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
@@ -27,6 +28,8 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -43,7 +46,6 @@ import org.apache.kafka.connect.util.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -76,7 +78,6 @@ public class WorkerTest extends ThreadedTest {
 
     private WorkerConfig config;
     private Worker worker;
-    private ConnectMetrics metrics;
 
     @Mock
     private Plugins plugins;
@@ -103,17 +104,12 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         config = new StandaloneConfig(workerProps);
-        metrics = new MockConnectMetrics();
 
         PowerMock.mockStatic(Plugins.class);
     }
 
-    @After
-    public void tearDown() {
-        if (metrics != null) metrics.stop();
-    }
-
     @Test
     public void testStartAndStopConnector() throws Exception {
         expectConverters();
@@ -134,6 +130,8 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
+        EasyMock.expect(connector.version()).andReturn("1.0");
+
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
@@ -171,10 +169,15 @@ public class WorkerTest extends ThreadedTest {
         } catch (ConnectException e) {
             // expected
         }
+        assertStatistics(worker, 1, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
         worker.stopConnector(CONNECTOR_ID);
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
+        assertStatistics(worker, 0, 0);
 
         PowerMock.verifyAll();
     }
@@ -210,11 +213,17 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
+        assertStatistics(worker, 0, 0);
         assertFalse(worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener,
TargetState.STARTED));
 
+        assertStartupStatistics(worker, 1, 1, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
 
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 1, 1, 0, 0);
         assertFalse(worker.stopConnector(CONNECTOR_ID));
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 1, 1, 0, 0);
 
         PowerMock.verifyAll();
     }
@@ -238,6 +247,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
 
+        EasyMock.expect(connector.version()).andReturn("1.0");
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
@@ -267,14 +277,21 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
+        assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+        assertStatistics(worker, 1, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
 
         worker.stopConnector(CONNECTOR_ID);
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
 
         PowerMock.verifyAll();
     }
@@ -298,6 +315,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
 
+        EasyMock.expect(connector.version()).andReturn("1.0");
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
@@ -327,14 +345,18 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
+        assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+        assertStatistics(worker, 1, 0);
 
         worker.stopConnector(CONNECTOR_ID);
+        assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
+        assertStatistics(worker, 0, 0);
 
         PowerMock.verifyAll();
     }
@@ -374,6 +396,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
+        EasyMock.expect(connector.version()).andReturn("1.0");
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(3);
@@ -409,8 +432,10 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
 
+        assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
+        assertStatistics(worker, 1, 0);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
             worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
@@ -426,10 +451,15 @@ public class WorkerTest extends ThreadedTest {
         assertEquals(2, taskConfigs.size());
         assertEquals(expectedTaskProps, taskConfigs.get(0));
         assertEquals(expectedTaskProps, taskConfigs.get(1));
+        assertStatistics(worker, 1, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
         worker.stopConnector(CONNECTOR_ID);
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 1, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
+        assertStatistics(worker, 0, 0);
 
         PowerMock.verifyAll();
     }
@@ -507,13 +537,21 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener,
TargetState.STARTED);
+        assertStatistics(worker, 0, 1);
+        assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 0, 0, 1, 0);
 
         PowerMock.verifyAll();
     }
@@ -550,9 +588,14 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 0, 0, 0, 0);
 
         assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener,
TargetState.STARTED));
+        assertStartupStatistics(worker, 0, 0, 1, 1);
 
+        assertStatistics(worker, 0, 0);
+        assertStartupStatistics(worker, 0, 0, 1, 1);
         assertEquals(Collections.emptySet(), worker.taskIds());
 
         PowerMock.verifyAll();
@@ -632,8 +675,11 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
+        assertStatistics(worker, 0, 0);
         worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener,
TargetState.STARTED);
+        assertStatistics(worker, 0, 1);
         worker.stop();
+        assertStatistics(worker, 0, 0);
 
         PowerMock.verifyAll();
     }
@@ -712,6 +758,7 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
+        assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         Map<String, String> connProps = anyConnectorConfigMap();
         connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
@@ -719,11 +766,14 @@ public class WorkerTest extends ThreadedTest {
         connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         connProps.put("value.converter.extra.config", "bar");
         worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED);
+        assertStatistics(worker, 0, 1);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
+        assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
+        assertStatistics(worker, 0, 0);
 
         // Validate extra configs got passed through to overridden converters
         assertEquals("foo", keyConverter.getValue().configs.get("extra.config"));
@@ -732,6 +782,41 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    private void assertStatistics(Worker worker, int connectors, int tasks) {
+        MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
+        assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-count"), 0.0001d);
+        assertEquals(tasks, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-count"), 0.0001d);
+        assertEquals(tasks, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-count"), 0.0001d);
+    }
+
+    private void assertStartupStatistics(Worker worker, int connectorStartupAttempts, int
connectorStartupFailures, int taskStartupAttempts, int taskStartupFailures) {
+        double connectStartupSuccesses = connectorStartupAttempts - connectorStartupFailures;
+        double taskStartupSuccesses = taskStartupAttempts - taskStartupFailures;
+        double connectStartupSuccessPct = 0.0d;
+        double connectStartupFailurePct = 0.0d;
+        double taskStartupSuccessPct = 0.0d;
+        double taskStartupFailurePct = 0.0d;
+        if (connectorStartupAttempts != 0) {
+            connectStartupSuccessPct = connectStartupSuccesses / connectorStartupAttempts;
+            connectStartupFailurePct = (double) connectorStartupFailures / connectorStartupAttempts;
+        }
+        if (taskStartupAttempts != 0) {
+            taskStartupSuccessPct = taskStartupSuccesses / taskStartupAttempts;
+            taskStartupFailurePct = (double) taskStartupFailures / taskStartupAttempts;
+        }
+        MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
+        assertEquals(connectorStartupAttempts, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-startup-attempts-total"), 0.0001d);
+        assertEquals(connectStartupSuccesses, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-startup-success-total"), 0.0001d);
+        assertEquals(connectorStartupFailures, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-startup-failure-total"), 0.0001d);
+        assertEquals(connectStartupSuccessPct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-startup-success-percentage"), 0.0001d);
+        assertEquals(connectStartupFailurePct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "connector-startup-failure-percentage"), 0.0001d);
+        assertEquals(taskStartupAttempts, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-startup-attempts-total"), 0.0001d);
+        assertEquals(taskStartupSuccesses, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-startup-success-total"), 0.0001d);
+        assertEquals(taskStartupFailures, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-startup-failure-total"), 0.0001d);
+        assertEquals(taskStartupSuccessPct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-startup-success-percentage"), 0.0001d);
+        assertEquals(taskStartupFailurePct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(),
workerMetrics, "task-startup-failure-percentage"), 0.0001d);
+    }
+
     private void expectStartStorage() {
         offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
         EasyMock.expectLastCall();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 40d0e2b..7483261 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -25,13 +25,16 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -50,6 +53,7 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -151,6 +155,7 @@ public class DistributedHerderTest {
     @Mock private WorkerGroupMember member;
     private MockTime time;
     private DistributedHerder herder;
+    private MockConnectMetrics metrics;
     @Mock private Worker worker;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
     @Mock
@@ -165,12 +170,13 @@ public class DistributedHerderTest {
 
     @Before
     public void setUp() throws Exception {
+        time = new MockTime();
+        metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
-        time = new MockTime();
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff",
"connectorTypeForClass", "updateDeletedConnectorStatus"},
-                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore,
configBackingStore, member, MEMBER_URL, time);
+                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore,
configBackingStore, member, MEMBER_URL, metrics, time);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
@@ -182,6 +188,11 @@ public class DistributedHerderTest {
         PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
     }
 
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
     @Test
     public void testJoinAssignment() throws Exception {
         // Join group and get assignment
@@ -204,6 +215,8 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick();
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
 
         PowerMock.verifyAll();
     }
@@ -241,9 +254,17 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
+        time.sleep(1000L);
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
         herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
         herder.tick();
 
+        time.sleep(3000L);
+        assertStatistics(3, 2, 100, 3000);
+
         PowerMock.verifyAll();
     }
 
@@ -282,7 +303,12 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick();
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         herder.tick();
+        time.sleep(2000L);
+        assertStatistics(3, 2, 100, 2000L);
 
         PowerMock.verifyAll();
     }
@@ -345,6 +371,9 @@ public class DistributedHerderTest {
         herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
         herder.tick();
 
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         PowerMock.verifyAll();
     }
 
@@ -390,6 +419,9 @@ public class DistributedHerderTest {
         assertTrue(error.hasCaptured());
         assertTrue(error.getValue() instanceof BadRequestException);
 
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         PowerMock.verifyAll();
     }
 
@@ -435,6 +467,9 @@ public class DistributedHerderTest {
         assertTrue(error.hasCaptured());
         assertTrue(error.getValue() instanceof BadRequestException);
 
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         PowerMock.verifyAll();
     }
 
@@ -478,6 +513,9 @@ public class DistributedHerderTest {
         assertTrue(error.hasCaptured());
         assertTrue(error.getValue() instanceof BadRequestException);
 
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         PowerMock.verifyAll();
     }
 
@@ -503,6 +541,9 @@ public class DistributedHerderTest {
         herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback);
         herder.tick();
 
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         PowerMock.verifyAll();
     }
 
@@ -535,6 +576,9 @@ public class DistributedHerderTest {
         herder.deleteConnectorConfig(CONN1, putConnectorCallback);
         herder.tick();
 
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         PowerMock.verifyAll();
     }
 
@@ -672,10 +716,16 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick();
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
         FutureCallback<Void> callback = new FutureCallback<>();
         herder.restartConnector(CONN1, callback);
         herder.tick();
 
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 3000L);
+
         try {
             callback.get(1000L, TimeUnit.MILLISECONDS);
             fail("Expected NotLeaderException to be raised");
@@ -1202,7 +1252,13 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick();
+        time.sleep(1000L);
+        assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
+
         herder.tick();
+        time.sleep(2000L);
+        assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
+
 
         PowerMock.verifyAll();
     }
@@ -1365,7 +1421,8 @@ public class DistributedHerderTest {
                     rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
                 ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
                         error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
-                rebalanceListener.onAssigned(assignment, 0);
+                rebalanceListener.onAssigned(assignment, 3);
+                time.sleep(100L);
                 return null;
             }
         });
@@ -1397,6 +1454,35 @@ public class DistributedHerderTest {
         EasyMock.expect(configBackingStore.snapshot()).andReturn(readToEndSnapshot);
     }
 
+    private void assertStatistics(int expectedEpoch, int completedRebalances, double rebalanceTime,
double millisSinceLastRebalance) {
+        String expectedLeader = completedRebalances <= 0 ? null : "leaderUrl";
+        assertStatistics(expectedLeader, false, expectedEpoch, completedRebalances, rebalanceTime,
millisSinceLastRebalance);
+    }
+
+    private void assertStatistics(String expectedLeader, boolean isRebalancing, int expectedEpoch,
int completedRebalances, double rebalanceTime, double millisSinceLastRebalance) {
+        HerderMetrics herderMetrics = herder.herderMetrics();
+        MetricGroup group = herderMetrics.metricGroup();
+        double epoch = MockConnectMetrics.currentMetricValueAsDouble(metrics, group, "epoch");
+        String leader = MockConnectMetrics.currentMetricValueAsString(metrics, group, "leader-name");
+        double rebalanceCompletedTotal = MockConnectMetrics.currentMetricValueAsDouble(metrics,
group, "completed-rebalances-total");
+        double rebalancing = MockConnectMetrics.currentMetricValueAsDouble(metrics, group,
"rebalancing");
+        double rebalanceTimeMax = MockConnectMetrics.currentMetricValueAsDouble(metrics,
group, "rebalance-max-time-ms");
+        double rebalanceTimeAvg = MockConnectMetrics.currentMetricValueAsDouble(metrics,
group, "rebalance-avg-time-ms");
+        double rebalanceTimeSinceLast = MockConnectMetrics.currentMetricValueAsDouble(metrics,
group, "time-since-last-rebalance-ms");
+
+        assertEquals(expectedEpoch, epoch, 0.0001d);
+        assertEquals(expectedLeader, leader);
+        assertEquals(completedRebalances, rebalanceCompletedTotal, 0.0001d);
+        assertEquals(isRebalancing ? 1.0d : 0.0d, rebalancing, 0.0001d);
+        assertEquals(millisSinceLastRebalance, rebalanceTimeSinceLast, 0.0001d);
+        if (rebalanceTime <= 0L) {
+            assertEquals(Double.NEGATIVE_INFINITY, rebalanceTimeMax, 0.0001d);
+            assertEquals(0.0d, rebalanceTimeAvg, 0.0001d);
+        } else {
+            assertEquals(rebalanceTime, rebalanceTimeMax, 0.0001d);
+            assertEquals(rebalanceTime, rebalanceTimeAvg, 0.0001d);
+        }
+    }
 
     // We need to use a real class here due to some issue with mocking java.lang.Class
     private abstract class BogusSourceConnector extends SourceConnector {


Mime
View raw message