kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4135; Consumer poll with no subscription or assignment should raise an error
Date Mon, 19 Sep 2016 23:43:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3281b3c90 -> 4e1c7d844


KAFKA-4135; Consumer poll with no subscription or assignment should raise an error

When the consumer is not subscribed to any topic or, in the case of manual assignment, is
not assigned any partition, calling `poll()` should raise an exception.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1839 from vahidhashemian/KAFKA-4135


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e1c7d84
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e1c7d84
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e1c7d84

Branch: refs/heads/trunk
Commit: 4e1c7d844f743e5b439447e645fa41d2f92b8b5f
Parents: 3281b3c
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Mon Sep 19 16:42:43 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Sep 19 16:42:43 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  6 ++++
 .../consumer/internals/SubscriptionState.java   |  4 +++
 .../clients/consumer/KafkaConsumerTest.java     | 32 ++++++++++++++++++++
 3 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e1c7d84/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f7f2d20..108c0cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -954,6 +954,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *             topics or to the configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
(e.g. invalid groupId or
      *             session timeout, errors deserializing key/value pairs, or any new error
cases in future versions)
+     * @throws java.lang.IllegalArgumentException if the timeout value is negative
+     * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics
or manually assigned any
+     *             partitions to consume from
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
@@ -962,6 +965,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             if (timeout < 0)
                 throw new IllegalArgumentException("Timeout must not be negative");
 
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
+                throw new IllegalStateException("Consumer is not subscribed to any topics
or assigned any partitions");
+
             // poll for new data until the timeout expires
             long start = time.milliseconds();
             long remaining = timeout;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e1c7d84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 9029417..6dc2060 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -212,6 +212,10 @@ public class SubscriptionState {
         return this.subscriptionType == SubscriptionType.AUTO_PATTERN;
     }
 
+    public boolean hasNoSubscriptionOrUserAssignment() {
+        return this.subscriptionType == SubscriptionType.NONE;
+    }
+
     public void unsubscribe() {
         this.subscription = Collections.emptySet();
         this.userAssignment = Collections.emptySet();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e1c7d84/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index fd0794c..2408c11 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -935,6 +935,38 @@ public class KafkaConsumerTest {
         consumer.close();
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testPollWithNoSubscription() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        try {
+            consumer.poll(0);
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testPollWithEmptySubscription() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        consumer.subscribe(Collections.<String>emptyList());
+        try {
+            consumer.poll(0);
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testPollWithEmptyUserAssignment() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        consumer.assign(Collections.<TopicPartition>emptySet());
+        try {
+            consumer.poll(0);
+        } finally {
+            consumer.close();
+        }
+    }
+
     private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String,
String> consumer) {
         return new ConsumerRebalanceListener() {
             @Override


Mime
View raw message