Repository: kafka
Updated Branches:
refs/heads/trunk 13b8fb295 -> 5db2c99e1
KAFKA-2698: Add paused() method to o.a.k.c.c.Consumer
Author: Tom Lee <github@tomlee.co>
Reviewers: Onur Karaman <okaraman@linkedin.com>, Jiangjie Qin <jiangjie@linkedin.com>,
Grant Henke <ghenke@cloudera.com>, Jason Gustafson <jason@confluent.io>, Guozhang
Wang <wangguoz@gmail.com>
Closes #962 from hachikuji/KAFKA-2698
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5db2c99e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5db2c99e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5db2c99e
Branch: refs/heads/trunk
Commit: 5db2c99e15a4467489b495144815ac8334c2c5d4
Parents: 13b8fb2
Author: Tom Lee <github@tomlee.co>
Authored: Wed Feb 24 19:41:57 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 24 19:41:57 2016 -0800
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 5 +++
.../kafka/clients/consumer/KafkaConsumer.java | 15 ++++++++
.../consumer/internals/SubscriptionState.java | 12 ++++++
.../clients/consumer/KafkaConsumerTest.java | 39 +++++++++++++++-----
4 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index c9f114d..c0f3030 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -142,6 +142,11 @@ public interface Consumer<K, V> extends Closeable {
public void pause(TopicPartition... partitions);
/**
+ * @see KafkaConsumer#paused()
+ */
+ public Set<TopicPartition> paused();
+
+ /**
* @see KafkaConsumer#resume(TopicPartition...)
*/
public void resume(TopicPartition... partitions);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f907922..e1b07d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1237,6 +1237,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
}
/**
+ * Get the set of partitions that were previously paused by a call to {@link #pause(TopicPartition...)}.
+ *
+ * @return The set of paused partitions
+ */
+ @Override
+ public Set<TopicPartition> paused() {
+ acquire();
+ try {
+ return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+ } finally {
+ release();
+ }
+ }
+
+ /**
* Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}.
New calls to
* {@link #poll(long)} will return records from these partitions if there are any to
be fetched.
* If the partitions were not previously paused, this method is a no-op.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 9efaf8c..af26357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -200,6 +200,18 @@ public class SubscriptionState {
return this.subscription;
}
+ public Set<TopicPartition> pausedPartitions() {
+ HashSet<TopicPartition> paused = new HashSet<>();
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
{
+ final TopicPartition tp = entry.getKey();
+ final TopicPartitionState state = entry.getValue();
+ if (state.paused) {
+ paused.add(tp);
+ }
+ }
+ return paused;
+ }
+
/**
* Get the subscription for the group. For the leader, this will include the union of
the
* subscriptions of all group members. For followers, it is just that member's subscription.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c65fd73..2ac024f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -57,13 +57,7 @@ public class KafkaConsumerTest {
@Test
public void testSubscription() {
- Properties props = new Properties();
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSubscription");
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
- props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer();
consumer.subscribe(Collections.singletonList(topic));
Assert.assertEquals(Collections.singleton(topic), consumer.subscription());
@@ -89,8 +83,7 @@ public class KafkaConsumerTest {
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
- props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer();
consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
}
@@ -116,4 +109,32 @@ public class KafkaConsumerTest {
MockConsumerInterceptor.resetCounters();
}
}
+
+ @Test
+ public void testPause() {
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+
+ consumer.assign(Collections.singletonList(tp0));
+ Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
+ Assert.assertTrue(consumer.paused().isEmpty());
+
+ consumer.pause(tp0);
+ Assert.assertEquals(Collections.singleton(tp0), consumer.paused());
+
+ consumer.resume(tp0);
+ Assert.assertTrue(consumer.paused().isEmpty());
+
+ consumer.unsubscribe();
+ Assert.assertTrue(consumer.paused().isEmpty());
+ }
+
+ private KafkaConsumer<byte[], byte[]> newConsumer() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+ return new KafkaConsumer<byte[], byte[]>(
+ props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
}
|