kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5901: Added Connect metrics specific to source tasks (KIP-196)
Date Thu, 28 Sep 2017 16:56:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9d0a89aea -> 89ba0c152


KAFKA-5901: Added Connect metrics specific to source tasks (KIP-196)

Added Connect metrics specific to source tasks, and builds upon #3864 and #3911 that have
already been merged into `trunk`.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: tedyu <yuzhihong@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3959 from rhauch/kafka-5901


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89ba0c15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89ba0c15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89ba0c15

Branch: refs/heads/trunk
Commit: 89ba0c1525b8f8a4e36d1e1b486ca660d5c24a7b
Parents: 9d0a89a
Author: Randall Hauch <rhauch@gmail.com>
Authored: Thu Sep 28 09:52:08 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Sep 28 09:53:19 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/metrics/stats/Frequencies.java |   4 +-
 .../kafka/connect/runtime/WorkerConnector.java  |   4 +
 .../kafka/connect/runtime/WorkerSourceTask.java | 109 +++++++++++++++++++
 .../kafka/connect/runtime/WorkerTask.java       |   5 +-
 .../connect/runtime/MockConnectMetrics.java     |  97 +++++++++++++++++
 .../connect/runtime/WorkerSinkTaskTest.java     |   4 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |  57 +++++++++-
 .../kafka/connect/runtime/WorkerTaskTest.java   |   4 +-
 8 files changed, 273 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
index 52178d3..a3d7d25 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
@@ -74,7 +74,7 @@ public class Frequencies extends SampledStat implements CompoundStat {
      * Create a Frequencies that captures the values in the specified range into the given
number of buckets,
      * where the buckets are centered around the minimum, maximum, and intermediate values.
      *
-     * @param buckets     the number of buckets
+     * @param buckets     the number of buckets; must be at least 1
      * @param min         the minimum value to be captured
      * @param max         the maximum value to be captured
      * @param frequencies the list of {@link Frequency} metrics, which at most should be
one per bucket centered
@@ -90,7 +90,7 @@ public class Frequencies extends SampledStat implements CompoundStat {
                                                        + " must be greater than the minimum
value " + min);
         }
         if (buckets < 1) {
-            throw new IllegalArgumentException("Must be at least 2 buckets");
+            throw new IllegalArgumentException("Must be at least 1 bucket");
         }
         if (buckets < frequencies.length) {
             throw new IllegalArgumentException("More frequencies than buckets");

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 49dbf4d..874edd5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -300,5 +300,9 @@ public class WorkerConnector {
         boolean isFailed() {
             return state == AbstractStatus.State.FAILED;
         }
+
+        protected MetricGroup metricGroup() {
+            return metricGroup;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 92f7789..0f1874e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -22,8 +22,14 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -60,6 +66,7 @@ class WorkerSourceTask extends WorkerTask {
     private final OffsetStorageReader offsetReader;
     private final OffsetStorageWriter offsetWriter;
     private final Time time;
+    private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
 
     private List<SourceRecord> toSend;
     private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e.
never made it into the producer's RecordAccumulator
@@ -107,6 +114,7 @@ class WorkerSourceTask extends WorkerTask {
         this.outstandingMessagesBacklog = new IdentityHashMap<>();
         this.flushing = false;
         this.stopRequestedLatch = new CountDownLatch(1);
+        this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
     }
 
     @Override
@@ -126,6 +134,7 @@ class WorkerSourceTask extends WorkerTask {
 
     @Override
     protected void releaseResources() {
+        sourceTaskMetricsGroup.close();
     }
 
     @Override
@@ -165,7 +174,11 @@ class WorkerSourceTask extends WorkerTask {
 
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for additional
records", this);
+                    long start = time.milliseconds();
                     toSend = task.poll();
+                    if (toSend != null) {
+                        recordPollReturned(toSend.size(), time.milliseconds() - start);
+                    }
                 }
                 if (toSend == null)
                     continue;
@@ -269,6 +282,7 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record)
{
+        recordWriteCompleted(1);
         ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
         // While flushing, we may also see callbacks for items in the backlog
         if (removed == null && flushing)
@@ -421,4 +435,99 @@ class WorkerSourceTask extends WorkerTask {
                 "id=" + id +
                 '}';
     }
+
+    protected void recordPollReturned(int numRecordsInBatch, long duration) {
+        sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
+    }
+
+    protected void recordWriteCompleted(int numRecords) {
+        sourceTaskMetricsGroup.recordWrite(numRecords);
+    }
+
+    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
+        return sourceTaskMetricsGroup;
+    }
+
+    static class SourceTaskMetricsGroup {
+        private final MetricGroup metricGroup;
+        private final Sensor sourceRecordPoll;
+        private final Sensor sourceRecordWrite;
+        private final Sensor pollTime;
+
+        public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics)
{
+            metricGroup = connectMetrics.group("source-task-metrics",
+                    "connector", id.connector(), "task", Integer.toString(id.task()));
+
+            sourceRecordPoll = metricGroup.sensor("source-record-poll");
+            sourceRecordPoll.add(metricGroup.metricName("source-record-poll-rate",
+                    "The average per-second number of records produced/polled (before transformation)
by this " +
+                            "task belonging to the named source connector in this worker."),
+                    new Rate());
+            sourceRecordPoll.add(metricGroup.metricName("source-record-poll-total",
+                    "The number of records produced/polled (before transformation) by this
task belonging to " +
+                            "the named source connector in this worker, since the task was
last restarted."),
+                    new Total());
+
+            sourceRecordWrite = metricGroup.sensor("source-record-write");
+            sourceRecordWrite.add(metricGroup.metricName("source-record-write-rate",
+                    "The average per-second number of records output from the transformations
and written to " +
+                            "Kafka for this task belonging to the named source connector
in this worker. " +
+                            "This is after transformations are applied and excludes any records
filtered out " +
+                            "by the transformations."),
+                    new Rate());
+            sourceRecordWrite.add(metricGroup.metricName("source-record-write-total",
+                    "The number of records output from the transformations and written to
Kafka for this task " +
+                            "belonging to the named source connector in this worker, since
the task was last " +
+                            "restarted."),
+                    new Total());
+
+
+            pollTime = metricGroup.sensor("poll-batch-time");
+            pollTime.add(metricGroup.metricName("poll-batch-max-time-ms",
+                    "The maximum time in milliseconds taken by this task to poll for a batch
of source records"),
+                    new Max());
+            pollTime.add(metricGroup.metricName("poll-batch-avg-time-ms",
+                    "The average time in milliseconds taken by this task to poll for a batch
of source records"),
+                    new Avg());
+
+//            int buckets = 100;
+//            MetricName p99 = metricGroup.metricName("poll-batch-99p-time-ms",
+//                    "The 99th percentile time in milliseconds spent by this task to poll
for a batch of source records");
+//            MetricName p95 = metricGroup.metricName("poll-batch-95p-time-ms",
+//                    "The 95th percentile time in milliseconds spent by this task to poll
for a batch of source records");
+//            MetricName p90 = metricGroup.metricName("poll-batch-90p-time-ms",
+//                    "The 90th percentile time in milliseconds spent by this task to poll
for a batch of source records");
+//            MetricName p75 = metricGroup.metricName("poll-batch-75p-time-ms",
+//                    "The 75th percentile time in milliseconds spent by this task to poll
for a batch of source records");
+//            MetricName p50 = metricGroup.metricName("poll-batch-50p-time-ms",
+//                    "The 50th percentile (median) time in milliseconds spent by this task
to poll for a batch of source records");
+//            Percentiles pollTimePercentiles = new Percentiles(4 * buckets,
+//                                                            0.0,
+//                                                            10*1000.0,
+//                                                            BucketSizing.LINEAR,
+//                                                            new Percentile(p50, 50),
+//                                                            new Percentile(p75, 75),
+//                                                            new Percentile(p90, 90),
+//                                                            new Percentile(p95, 95),
+//                                                            new Percentile(p99, 99));
+//            pollTime.add(pollTimePercentiles);
+        }
+
+        void close() {
+            metricGroup.close();
+        }
+
+        void recordPoll(int batchSize, long duration) {
+            sourceRecordPoll.record(batchSize);
+            pollTime.record(duration);
+        }
+
+        void recordWrite(int recordCount) {
+            sourceRecordWrite.record(recordCount);
+        }
+
+        protected MetricGroup metricGroup() {
+            return metricGroup;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 44703dd..e4af516 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -436,9 +436,8 @@ abstract class WorkerTask implements Runnable {
             return taskStateTimer.currentState();
         }
 
-        double currentMetricValue(String name) {
-            MetricName metricName = metricGroup.metricName(name, "desc");
-            return metricGroup.metrics().metric(metricName).value();
+        protected MetricGroup metricGroup() {
+            return metricGroup;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index 0a61e10..a4592b8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -16,19 +16,40 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.connect.util.MockTime;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+/**
+ * A specialization of {@link ConnectMetrics} that uses a custom {@link MetricsReporter}
to capture the metrics
+ * that were created, and makes those metrics available even after the metrics were removed
from the
+ * {@link org.apache.kafka.common.metrics.Metrics} registry.
+ *
+ * This is needed because many of the Connect metric groups are specific to connectors and/or
tasks, and therefore
+ * their metrics are removed from the {@link org.apache.kafka.common.metrics.Metrics} registry
when the connector
+ * and tasks are closed. This instance keeps track of the metrics that were created so that
it is possible for
+ * tests to {@link #currentMetricValue(MetricGroup, String) read the metrics' value} even
after the connector
+ * and/or tasks have been closed.
+ *
+ * If the same metric is created a second time (e.g., a worker task is re-created), the new
metric will replace
+ * the previous metric in the custom reporter.
+ */
 public class MockConnectMetrics extends ConnectMetrics {
 
     private static final Map<String, String> DEFAULT_WORKER_CONFIG = new HashMap<>();
+
     static {
         DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_WORKER_CONFIG.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
     }
 
     public MockConnectMetrics() {
@@ -39,4 +60,80 @@ public class MockConnectMetrics extends ConnectMetrics {
     public MockTime time() {
         return (MockTime) super.time();
     }
+
+    /**
+     * Get the current value of the named metric, which may have already been removed from
the
+     * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before
it was removed.
+     *
+     * @param metricGroup the metric metricGroup that contained the metric
+     * @param name        the name of the metric
+     * @return the current value of the metric
+     */
+    public double currentMetricValue(MetricGroup metricGroup, String name) {
+        MetricName metricName = metricGroup.metricName(name, "desc");
+        for (MetricsReporter reporter : metrics().reporters()) {
+            if (reporter instanceof MockMetricsReporter) {
+                return ((MockMetricsReporter) reporter).currentMetricValue(metricName);
+            }
+        }
+        return Double.NEGATIVE_INFINITY;
+    }
+
+    /**
+     * Determine if the {@link KafkaMetric} with the specified name exists within the
+     * {@link org.apache.kafka.common.metrics.Metrics} instance.
+     *
+     * @param metricGroup the metric metricGroup that contained the metric
+     * @param name        the name of the metric
+     * @return true if the metric is still register, or false if it has been removed
+     */
+    public boolean metricExists(MetricGroup metricGroup, String name) {
+        MetricName metricName = metricGroup.metricName(name, "desc");
+        KafkaMetric metric = metricGroup.metrics().metric(metricName);
+        return metric != null;
+    }
+
+    public static class MockMetricsReporter implements MetricsReporter {
+        private Map<MetricName, KafkaMetric> metricsByName = new HashMap<>();
+
+        public MockMetricsReporter() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // do nothing
+        }
+
+        @Override
+        public void init(List<KafkaMetric> metrics) {
+            for (KafkaMetric metric : metrics) {
+                metricsByName.put(metric.metricName(), metric);
+            }
+        }
+
+        @Override
+        public void metricChange(KafkaMetric metric) {
+            metricsByName.put(metric.metricName(), metric);
+        }
+
+        @Override
+        public void metricRemoval(KafkaMetric metric) {
+            // don't remove metrics, or else we won't be able to access them after the metric
metricGroup is closed
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        /**
+         * Get the current value of the metric.
+         *
+         * @param metricName the name of the metric that was registered most recently
+         * @return the current value of the metric
+         */
+        public double currentMetricValue(MetricName metricName) {
+            return metricsByName.get(metricName).value();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 524c022..5f3f888 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -113,7 +113,7 @@ public class WorkerSinkTaskTest {
     private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
-    private ConnectMetrics metrics;
+    private MockConnectMetrics metrics;
     @Mock
     private PluginClassLoader pluginLoader;
     @Mock
@@ -902,7 +902,7 @@ public class WorkerSinkTaskTest {
         assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been
cleared
         assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask,
"lastCommittedOffsets"));
         assertEquals(0, workerTask.commitFailures());
-        assertEquals(1.0, workerTask.taskMetricsGroup().currentMetricValue("batch-size-max"),
0.0001);
+        assertEquals(1.0, metrics.currentMetricValue(workerTask.taskMetricsGroup().metricGroup(),
"batch-size-max"), 0.0001);
 
         PowerMock.verifyAll();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index cf19522..8a23ad5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -87,7 +89,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private WorkerConfig config;
     private Plugins plugins;
-    private ConnectMetrics metrics;
+    private MockConnectMetrics metrics;
     @Mock private SourceTask sourceTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
@@ -270,6 +272,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         assertTrue(workerTask.awaitStop(1000));
 
         taskFuture.get();
+        assertPollMetrics(10);
 
         PowerMock.verifyAll();
     }
@@ -318,6 +321,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         assertTrue(workerTask.awaitStop(1000));
 
         taskFuture.get();
+        assertPollMetrics(0);
 
         PowerMock.verifyAll();
     }
@@ -362,6 +366,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         assertTrue(workerTask.awaitStop(1000));
 
         taskFuture.get();
+        assertPollMetrics(1);
 
         PowerMock.verifyAll();
     }
@@ -406,10 +411,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         assertTrue(workerTask.awaitStop(1000));
 
         taskFuture.get();
+        assertPollMetrics(1);
 
         PowerMock.verifyAll();
     }
-    
+
     @Test
     public void testSendRecordsConvertsData() throws Exception {
         createWorkerTask();
@@ -602,6 +608,21 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testMetricsGroup() {
+        SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordPoll(100, 1000 + i * 100);
+            group.recordWrite(10);
+        }
+        assertEquals(1900.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-max-time-ms"),
0.001d);
+        assertEquals(1450.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-avg-time-ms"),
0.001d);
+        assertEquals(33.333, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-rate"),
0.001d);
+        assertEquals(1000, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-total"),
0.001d);
+        assertEquals(3.3333, metrics.currentMetricValue(group.metricGroup(), "source-record-write-rate"),
0.001d);
+        assertEquals(100, metrics.currentMetricValue(group.metricGroup(), "source-record-write-total"),
0.001d);
+    }
+
     private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException
{
         final CountDownLatch latch = new CountDownLatch(minimum);
         // Note that we stub these to allow any number of calls because the thread will continue
to
@@ -613,6 +634,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                     public List<SourceRecord> answer() throws Throwable {
                         count.incrementAndGet();
                         latch.countDown();
+                        Thread.sleep(10);
                         return RECORDS;
                     }
                 });
@@ -769,6 +791,37 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         }
     }
 
+    private void assertPollMetrics(int minimumPollCountExpected) {
+        MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
+        MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+        double pollRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-rate");
+        double pollTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-total");
+        if (minimumPollCountExpected > 0) {
+            assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-max"),
0.000001d);
+            assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-avg"),
0.000001d);
+            assertTrue(pollRate > 0.0d);
+        } else {
+            assertTrue(pollRate == 0.0d);
+        }
+        assertTrue(pollTotal >= minimumPollCountExpected);
+
+        double writeRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-rate");
+        double writeTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-total");
+        if (minimumPollCountExpected > 0) {
+            assertTrue(writeRate > 0.0d);
+        } else {
+            assertTrue(writeRate == 0.0d);
+        }
+        assertTrue(writeTotal >= minimumPollCountExpected);
+
+        double pollBatchTimeMax = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-max-time-ms");
+        double pollBatchTimeAvg = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-avg-time-ms");
+        if (minimumPollCountExpected > 0) {
+            assertTrue(pollBatchTimeMax >= 0.0d);
+        }
+        assertTrue(pollBatchTimeAvg >= 0.0d);
+    }
+
     private abstract static class TestSourceTask extends SourceTask {
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89ba0c15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index e55c7fa..96746a5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -314,8 +314,8 @@ public class WorkerTaskTest {
         long totalTime = 27000L;
         double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
         double runningTimeRatio = (double) (2000L + 4000L + 6000L) / totalTime;
-        assertEquals(pauseTimeRatio, group.currentMetricValue("pause-ratio"), 0.000001d);
-        assertEquals(runningTimeRatio, group.currentMetricValue("running-ratio"), 0.000001d);
+        assertEquals(pauseTimeRatio, metrics.currentMetricValue(group.metricGroup(), "pause-ratio"),
0.000001d);
+        assertEquals(runningTimeRatio, metrics.currentMetricValue(group.metricGroup(), "running-ratio"),
0.000001d);
     }
 
     private static abstract class TestSinkTask extends SinkTask {


Mime
View raw message