kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2698: Add paused() method to o.a.k.c.c.Consumer
Date Thu, 25 Feb 2016 03:42:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 13b8fb295 -> 5db2c99e1


KAFKA-2698: Add paused() method to o.a.k.c.c.Consumer

Author: Tom Lee <github@tomlee.co>

Reviewers: Onur Karaman <okaraman@linkedin.com>, Jiangjie Qin <jiangjie@linkedin.com>,
Grant Henke <ghenke@cloudera.com>, Jason Gustafson <jason@confluent.io>, Guozhang
Wang <wangguoz@gmail.com>

Closes #962 from hachikuji/KAFKA-2698


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5db2c99e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5db2c99e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5db2c99e

Branch: refs/heads/trunk
Commit: 5db2c99e15a4467489b495144815ac8334c2c5d4
Parents: 13b8fb2
Author: Tom Lee <github@tomlee.co>
Authored: Wed Feb 24 19:41:57 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 24 19:41:57 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |  5 +++
 .../kafka/clients/consumer/KafkaConsumer.java   | 15 ++++++++
 .../consumer/internals/SubscriptionState.java   | 12 ++++++
 .../clients/consumer/KafkaConsumerTest.java     | 39 +++++++++++++++-----
 4 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index c9f114d..c0f3030 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -142,6 +142,11 @@ public interface Consumer<K, V> extends Closeable {
     public void pause(TopicPartition... partitions);
 
     /**
+     * @see KafkaConsumer#paused()
+     */
+    public Set<TopicPartition> paused();
+
+    /**
      * @see KafkaConsumer#resume(TopicPartition...)
      */
     public void resume(TopicPartition... partitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f907922..e1b07d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1237,6 +1237,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     }
 
     /**
+     * Get the set of partitions that were previously paused by a call to {@link #pause(TopicPartition...)}.
+     *
+     * @return The set of paused partitions
+     */
+    @Override
+    public Set<TopicPartition> paused() {
+        acquire();
+        try {
+            return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+        } finally {
+            release();
+        }
+    }
+
+    /**
      * Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}.
New calls to
      * {@link #poll(long)} will return records from these partitions if there are any to
be fetched.
      * If the partitions were not previously paused, this method is a no-op.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 9efaf8c..af26357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -200,6 +200,18 @@ public class SubscriptionState {
         return this.subscription;
     }
 
+    public Set<TopicPartition> pausedPartitions() {
+        HashSet<TopicPartition> paused = new HashSet<>();
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
{
+            final TopicPartition tp = entry.getKey();
+            final TopicPartitionState state = entry.getValue();
+            if (state.paused) {
+                paused.add(tp);
+            }
+        }
+        return paused;
+    }
+
     /**
      * Get the subscription for the group. For the leader, this will include the union of
the
      * subscriptions of all group members. For followers, it is just that member's subscription.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5db2c99e/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c65fd73..2ac024f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -57,13 +57,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testSubscription() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSubscription");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
-        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
-            props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
 
         consumer.subscribe(Collections.singletonList(topic));
         Assert.assertEquals(Collections.singleton(topic), consumer.subscription());
@@ -89,8 +83,7 @@ public class KafkaConsumerTest {
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
-        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
-                props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
         consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
     }
@@ -116,4 +109,32 @@ public class KafkaConsumerTest {
             MockConsumerInterceptor.resetCounters();
         }
     }
+
+    @Test
+    public void testPause() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+
+        consumer.assign(Collections.singletonList(tp0));
+        Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
+        Assert.assertTrue(consumer.paused().isEmpty());
+
+        consumer.pause(tp0);
+        Assert.assertEquals(Collections.singleton(tp0), consumer.paused());
+
+        consumer.resume(tp0);
+        Assert.assertTrue(consumer.paused().isEmpty());
+
+        consumer.unsubscribe();
+        Assert.assertTrue(consumer.paused().isEmpty());
+    }
+
+    private KafkaConsumer<byte[], byte[]> newConsumer() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        return new KafkaConsumer<byte[], byte[]>(
+            props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
 }


Mime
View raw message