kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4381: Add per partition lag metrics to the consumer
Date Fri, 13 Jan 2017 19:38:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e90db3e3a -> 88fdca26a


KAFKA-4381: Add per partition lag metrics to the consumer

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2329 from becketqin/KAFKA-4381


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/88fdca26
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/88fdca26
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/88fdca26

Branch: refs/heads/trunk
Commit: 88fdca26a0881d12e9f3ba942cd580c8e8154025
Parents: e90db3e
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Jan 13 11:38:25 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jan 13 11:38:25 2017 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   2 +-
 .../kafka/clients/consumer/MockConsumer.java    |   8 +-
 .../clients/consumer/internals/Fetcher.java     |  44 +++++++-
 .../consumer/internals/SubscriptionState.java   |  27 ++++-
 .../kafka/common/metrics/stats/Value.java       |  33 ++++++
 .../clients/consumer/KafkaConsumerTest.java     |   8 +-
 .../internals/ConsumerCoordinatorTest.java      |  10 +-
 .../clients/consumer/internals/FetcherTest.java |   6 +-
 .../internals/SubscriptionStateTest.java        |   3 +-
 .../kafka/api/BaseConsumerTest.scala            | 102 ++++++++++++++++++-
 10 files changed, 218 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d3bdf6e..1b033e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -663,7 +663,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
-            this.subscriptions = new SubscriptionState(offsetResetStrategy);
+            this.subscriptions = new SubscriptionState(offsetResetStrategy, metrics);
             List<PartitionAssignor> assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9d57f83..95e3830 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
@@ -34,6 +34,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import org.apache.kafka.common.metrics.Metrics;
+
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
This class is <i> not
@@ -57,7 +59,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private AtomicBoolean wakeup;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
-        this.subscriptions = new SubscriptionState(offsetResetStrategy);
+        this.subscriptions = new SubscriptionState(offsetResetStrategy, new Metrics());
         this.partitions = new HashMap<>();
         this.records = new HashMap<>();
         this.paused = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 48db458..3a916a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
@@ -500,6 +501,11 @@ public class Fetcher<K, V> {
                         "position to {}", position, partitionRecords.partition, nextOffset);
 
                 subscriptions.position(partitionRecords.partition, nextOffset);
+                Long partitionLag = subscriptions.partitionLag(partitionRecords.partition);
+                if (partitionLag != null) {
+                    this.sensors.recordsFetchLag.record(partitionLag);
+                    this.sensors.recordPartitionFetchLag(partitionRecords.partition, partitionLag);
+                }
                 return partRecords;
             } else {
                 // these records aren't next in line based on the last consumed position,
ignore them
@@ -763,12 +769,23 @@ public class Fetcher<K, V> {
                 if (!parsed.isEmpty()) {
                     log.trace("Adding fetched record for partition {} with offset {} to buffered
record list", tp, position);
                     parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
-                    ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
-                    this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                } else if (partition.highWatermark >= 0) {
-                    log.trace("Received empty fetch response for partition {} with offset
{}", tp, position);
-                    this.sensors.recordsFetchLag.record(partition.highWatermark - fetchOffset);
                 }
+
+                if (partition.highWatermark >= 0) {
+                    log.trace("Received {} records in fetch response for partition {} with
offset {}", parsed.size(), tp, position);
+                    Long partitionLag = subscriptions.partitionLag(tp);
+                    subscriptions.updateHighWatermark(tp, partition.highWatermark);
+                    // If the partition lag is null, that means this is the first fetch response
for this partition.
+                    // We update the lag here to create the lag metric. This is to handle
the case that there is no
+                    // message consumed by the end user from this partition. If there are
messages returned from the
+                    // partition, the lag will be updated when those messages are consumed
by the end user.
+                    if (partitionLag == null) {
+                        partitionLag = subscriptions.partitionLag(tp);
+                        this.sensors.recordsFetchLag.record(partitionLag);
+                        this.sensors.recordPartitionFetchLag(tp, partitionLag);
+                    }
+                }
+
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                 this.metadata.requestUpdate();
@@ -1057,6 +1074,23 @@ public class Fetcher<K, V> {
             }
             recordsFetched.record(records);
         }
+
+        public void recordPartitionFetchLag(TopicPartition tp, long lag) {
+            String name = tp + ".records-lag";
+            Sensor recordsLag = this.metrics.getSensor(name);
+            if (recordsLag == null) {
+                recordsLag = this.metrics.sensor(name);
+                recordsLag.add(this.metrics.metricName(name, this.metricGrpName, "The latest
lag of the partition"),
+                               new Value());
+                recordsLag.add(this.metrics.metricName(name + "-max",
+                        this.metricGrpName,
+                        "The max lag of the partition"), new Max());
+                recordsLag.add(this.metrics.metricName(name + "-avg",
+                        this.metricGrpName,
+                        "The average lag of the partition"), new Avg());
+            }
+            recordsLag.record(lag);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a476221..ad4f20b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -17,6 +17,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.PartitionStates;
+import org.apache.kafka.common.metrics.Metrics;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -80,13 +81,16 @@ public class SubscriptionState {
     /* Listener to be invoked when assignment changes */
     private ConsumerRebalanceListener listener;
 
-    public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
+    private final Metrics metrics;
+
+    public SubscriptionState(OffsetResetStrategy defaultResetStrategy, Metrics metrics) {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
         this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to
fetch offset upon starting up
         this.subscribedPattern = null;
+        this.metrics = metrics;
         this.subscriptionType = SubscriptionType.NONE;
     }
 
@@ -156,6 +160,7 @@ public class SubscriptionState {
         setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
         if (!this.assignment.partitionSet().equals(partitions)) {
+            removeAllLagSensors(partitions);
             Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
             for (TopicPartition partition : partitions) {
                 TopicPartitionState state = assignment.stateValue(partition);
@@ -175,6 +180,8 @@ public class SubscriptionState {
     public void assignFromSubscribed(Collection<TopicPartition> assignments) {
         if (!this.partitionsAutoAssigned())
             throw new IllegalArgumentException("Attempt to dynamically assign partitions
while manual assignment in use");
+        Set<TopicPartition> newAssignment = new HashSet<>(assignments);
+        removeAllLagSensors(newAssignment);
 
         for (TopicPartition tp : assignments)
             if (!this.subscription.contains(tp.topic()))
@@ -185,6 +192,13 @@ public class SubscriptionState {
         this.needsFetchCommittedOffsets = true;
     }
 
+    private void removeAllLagSensors(Set<TopicPartition> preservedPartitions) {
+        for (TopicPartition tp : assignment.partitionSet()) {
+            if (!preservedPartitions.contains(tp))
+                metrics.removeSensor(tp + ".records-lag");
+        }
+    }
+
     private Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition>
assignments) {
         Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
         for (TopicPartition tp : assignments)
@@ -305,6 +319,15 @@ public class SubscriptionState {
         return assignedState(tp).position;
     }
 
+    public Long partitionLag(TopicPartition tp) {
+        TopicPartitionState topicPartitionState = assignedState(tp);
+        return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark
- topicPartitionState.position;
+    }
+
+    public void updateHighWatermark(TopicPartition tp, long highWatermark) {
+        assignedState(tp).highWatermark = highWatermark;
+    }
+
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
         for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
@@ -380,6 +403,7 @@ public class SubscriptionState {
 
     private static class TopicPartitionState {
         private Long position; // last consumed position
+        private Long highWatermark; // the high watermark from last fetch
         private OffsetAndMetadata committed;  // last committed position
         private boolean paused;  // whether this partition has been paused by the user
         private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset
needs resetting
@@ -387,6 +411,7 @@ public class SubscriptionState {
         public TopicPartitionState() {
             this.paused = false;
             this.position = null;
+            this.highWatermark = null;
             this.committed = null;
             this.resetStrategy = null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/main/java/org/apache/kafka/common/metrics/stats/Value.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Value.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Value.java
new file mode 100644
index 0000000..d83a023
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Value.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * An instantaneous value.
+ */
+public class Value implements MeasurableStat {
+    private double value = 0;
+
+    @Override
+    public double measure(MetricConfig config, long now) {
+        return value;
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long timeMs) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b426c9a..bf91c28 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -108,7 +108,7 @@ public class KafkaConsumerTest {
 
     private final String topic3 = "test3";
     private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
-    
+
     @Rule
     public ExpectedException expectedException = ExpectedException.none();
 
@@ -692,7 +692,7 @@ public class KafkaConsumerTest {
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(5, records.count());
     }
-    
+
     @Test
     public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
         int rebalanceTimeoutMs = 60000;
@@ -712,7 +712,7 @@ public class KafkaConsumerTest {
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0);
-        
+
         consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
 
@@ -1434,7 +1434,7 @@ public class KafkaConsumerTest {
         ConsumerInterceptors<String, String> interceptors = null;
 
         Metrics metrics = new Metrics();
-        SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy);
+        SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy, metrics);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata,
time, retryBackoffMs, requestTimeoutMs);
         ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
                 consumerClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b533efd..fa2752c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -113,7 +113,7 @@ public class ConsumerCoordinatorTest {
     public void setup() {
         this.time = new MockTime();
         this.client = new MockClient(time);
-        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST, metrics);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.metadata.update(cluster, time.milliseconds());
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
@@ -279,7 +279,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test(expected = ApiException.class)
-    public void testJoinGroupInvalidGroupId() { 
+    public void testJoinGroupInvalidGroupId() {
         final String consumerId = "leader";
 
         subscriptions.subscribe(singleton(topicName), rebalanceListener);
@@ -619,7 +619,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testMetadataChangeTriggersRebalance() { 
+    public void testMetadataChangeTriggersRebalance() {
         final String consumerId = "consumer";
 
         // ensure metadata is up-to-date for leader
@@ -703,7 +703,7 @@ public class ConsumerCoordinatorTest {
 
 
     @Test
-    public void testExcludeInternalTopicsConfigOption() { 
+    public void testExcludeInternalTopicsConfigOption() {
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
         metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2),
time.milliseconds());
@@ -720,7 +720,7 @@ public class ConsumerCoordinatorTest {
 
         assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
     }
-    
+
     @Test
     public void testRejoinGroup() {
         String otherTopic = "otherTopic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7941e44..cffa59f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -90,9 +90,9 @@ public class FetcherTest {
     private MockClient client = new MockClient(time, metadata);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
-    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
     private Metrics metrics = new Metrics(time);
+    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST,
metrics);
+    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE,
metrics);
     private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata,
time, 100, 1000);
 
@@ -657,7 +657,7 @@ public class FetcherTest {
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d",
v).getBytes());
         fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0);
-        assertEquals(198, recordsFetchLagMax.value(), EPSILON);
+        assertEquals(197, recordsFetchLagMax.value(), EPSILON);
     }
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords
records, short error, long hw, int throttleTime) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index a950cad..ef75754 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -36,7 +37,7 @@ import static org.junit.Assert.assertTrue;
 
 public class SubscriptionStateTest {
 
-    private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+    private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST,
new Metrics());
     private final String topic = "test";
     private final String topic1 = "test1";
     private final TopicPartition tp0 = new TopicPartition(topic, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/88fdca26/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 732b99f..4bc1678 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -13,16 +13,16 @@
 package kafka.api
 
 import java.util
+import java.util.Collections
 
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.apache.kafka.common.{MetricName, PartitionInfo, TopicPartition}
 import kafka.utils.{Logging, ShutdownableThread, TestUtils}
 import kafka.common.Topic
 import kafka.server.KafkaConfig
-
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.WakeupException
  */
 abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
+  val epsilon = 0.1
   val producerCount = 1
   val consumerCount = 2
   val serverCount = 3
@@ -117,6 +118,103 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
     assertEquals(1, listener.callsToRevoked)
   }
 
+  @Test
+  def testPerPartitionLagMetricsCleanUpWithSubscribe() {
+    val numMessages = 1000
+    val topic2 = "topic2"
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+    // send some messages.
+    sendRecords(numMessages, tp)
+    // Test subscribe
+    // Create a consumer and consumer some messages.
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe")
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+    try {
+      val listener0 = new TestConsumerReassignmentListener
+      consumer.subscribe(List(topic, topic2).asJava, listener0)
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+          records = consumer.poll(100)
+          !records.records(tp).isEmpty
+        }, "Consumer did not consume any message before timeout.")
+      assertEquals("should be assigned once", 1, listener0.callsToAssigned)
+      // Verify the metric exist.
+      val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+      val fetchLag0 = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics",
"", tags))
+      assertNotNull(fetchLag0)
+      val expectedLag = numMessages - records.count
+      assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
+
+      // Remove topic from subscription
+      consumer.subscribe(List(topic2).asJava, listener0)
+      TestUtils.waitUntilTrue(() => {
+        consumer.poll(100)
+        listener0.callsToAssigned >= 2
+      }, "Expected rebalance did not occur.")
+      // Verify the metric has gone
+      assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics",
"", tags)))
+      assertNull(consumer.metrics.get(new MetricName(tp2 + ".records-lag", "consumer-fetch-manager-metrics",
"", tags)))
+    } finally {
+      consumer.close()
+    }
+  }
+
+  @Test
+  def testPerPartitionLagMetricsCleanUpWithAssign() {
+    val numMessages = 1000
+    // Test assign
+    // send some messages.
+    sendRecords(numMessages, tp)
+    sendRecords(numMessages, tp2)
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+    try {
+      consumer.assign(List(tp).asJava)
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+          records = consumer.poll(100)
+          !records.records(tp).isEmpty
+        }, "Consumer did not consume any message before timeout.")
+      // Verify the metric exist.
+      val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+      val fetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics",
"", tags))
+      assertNotNull(fetchLag)
+      val expectedLag = numMessages - records.count
+      assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
+
+      consumer.assign(List(tp2).asJava)
+      TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume
any message before timeout.")
+      assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics",
"", tags)))
+    } finally {
+      consumer.close()
+    }
+  }
+
+  @Test
+  def testPerPartitionLagWithMaxPollRecords() {
+    val numMessages = 1000
+    val maxPollRecords = 10
+    sendRecords(numMessages, tp)
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords")
+    consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+    consumer.assign(List(tp).asJava)
+    try {
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+          records = consumer.poll(100)
+          !records.isEmpty
+        }, "Consumer did not consume any message before timeout.")
+      val tags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
+      val lag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics",
"", tags))
+      assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count,
lag.value, epsilon)
+    } finally {
+      consumer.close()
+    }
+  }
 
   protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
     var callsToAssigned = 0


Mime
View raw message