kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3905: Handling null/empty topics and collections, patterns when subscription with list of topics or with patterns, and with assignments.
Date Thu, 14 Jul 2016 09:50:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f17790cd9 -> 9f94a7752


KAFKA-3905: Handling null/empty topics and collections, patterns when subscription with list
of topics or with patterns, and with assignments.

- Added validity checks for input parameters on subscribe, assign to avoid NPE, and provide
an argument exception instead
- Updated behavior for subscription with null collection to be same as when subscription with
emptyList.i.e., unsubscribes.
- Added tests on subscription, assign

Author: Rekha Joshi <rekhajoshm@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1601 from rekhajoshm/KAFKA-3905-1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f94a775
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f94a775
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f94a775

Branch: refs/heads/trunk
Commit: 9f94a7752a590c36186a7eb1eee16e992ec68c97
Parents: f17790c
Author: Rekha Joshi <rekhajoshm@gmail.com>
Authored: Thu Jul 14 10:26:30 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jul 14 10:44:58 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  41 ++++++--
 .../clients/consumer/KafkaConsumerTest.java     | 102 +++++++++++++++++++
 2 files changed, 136 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f94a775/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 524bdfc..ff94dc8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -790,15 +790,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @param topics The list of topics to subscribe to
      * @param listener Non-null listener instance to get notifications on partition assignment/revocation
for the
      *                 subscribed topics
+     * @throws IllegalArgumentException If topics is null or contains null or empty elements
      */
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
{
         acquire();
         try {
-            if (topics.isEmpty()) {
+            if (topics == null) {
+                throw new IllegalArgumentException("Topic collection to subscribe to cannot
be null");
+            } else if (topics.isEmpty()) {
                 // treat subscribing to empty topic list as the same as unsubscribing
                 this.unsubscribe();
             } else {
+                for (String topic : topics) {
+                    if (topic == null || topic.trim().isEmpty())
+                        throw new IllegalArgumentException("Topic collection to subscribe
to cannot contain null or empty topic");
+                }
                 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
                 this.subscriptions.subscribe(topics, listener);
                 metadata.setTopics(subscriptions.groupSubscription());
@@ -824,6 +831,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * management since the listener gives you an opportunity to commit offsets before a
rebalance finishes.
      *
      * @param topics The list of topics to subscribe to
+     * @throws IllegalArgumentException If topics is null or contains null or empty elements
      */
     @Override
     public void subscribe(Collection<String> topics) {
@@ -833,6 +841,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     /**
      * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
The pattern matching will be done periodically against topics
      * existing at the time of check.
+     *
      * <p>
      * As part of group management, the consumer will keep track of the list of consumers
that
      * belong to a particular group and will trigger a rebalance operation if one of the
@@ -845,11 +854,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * </ul>
      *
      * @param pattern Pattern to subscribe to
+     * @throws IllegalArgumentException If pattern is null
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
         acquire();
         try {
+            if (pattern == null)
+                throw new IllegalArgumentException("Topic pattern to subscribe to cannot
be null");
             log.debug("Subscribed to pattern: {}", pattern);
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
@@ -878,6 +890,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     /**
      * Manually assign a list of partition to this consumer. This interface does not allow
for incremental assignment
      * and will replace the previous assignment (if there is one).
+     *
+     * If the given list of topic partition is empty, it is treated the same as {@link #unsubscribe()}.
+     *
      * <p>
      * Manual topic assignment through this method does not use the consumer's group management
      * functionality. As such, there will be no rebalance operation triggered when group
membership or cluster and topic
@@ -885,17 +900,29 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
      *
      * @param partitions The list of partitions to assign this consumer
+     * @throws IllegalArgumentException If partitions is null or contains null or empty topics
      */
     @Override
     public void assign(Collection<TopicPartition> partitions) {
         acquire();
         try {
-            log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
-            this.subscriptions.assignFromUser(partitions);
-            Set<String> topics = new HashSet<>();
-            for (TopicPartition tp : partitions)
-                topics.add(tp.topic());
-            metadata.setTopics(topics);
+            if (partitions == null) {
+                throw new IllegalArgumentException("Topic partition collection to assign
to cannot be null");
+            } else if (partitions.isEmpty()) {
+                this.unsubscribe();
+            } else {
+                Set<String> topics = new HashSet<>();
+                for (TopicPartition tp : partitions) {
+                    String topic = (tp != null) ? tp.topic() : null;
+                    if (topic == null || topic.trim().isEmpty())
+                        throw new IllegalArgumentException("Topic partitions to assign to
cannot have null or empty topic");
+                    topics.add(topic);
+                }
+
+                log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
+                this.subscriptions.assignFromUser(partitions);
+                metadata.setTopics(topics);
+            }
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f94a775/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 3cbb62f..b5a5fca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -65,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
@@ -148,6 +150,53 @@ public class KafkaConsumerTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
+    public void testSubscriptionOnNullTopicCollection() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+
+        try {
+            consumer.subscribe(null);
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSubscriptionOnNullTopic() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        String nullTopic = null;
+
+        try {
+            consumer.subscribe(Collections.singletonList(nullTopic));
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSubscriptionOnEmptyTopic() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        String emptyTopic = "  ";
+
+        try {
+            consumer.subscribe(Collections.singletonList(emptyTopic));
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSubscriptionOnNullPattern() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        Pattern pattern = null;
+
+        try {
+            consumer.subscribe(pattern, new NoOpConsumerRebalanceListener());
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
     public void testSeekNegative() {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
@@ -162,6 +211,59 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testAssignOnNullTopicPartition() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicPartition");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        try {
+            consumer.assign(null);
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test
+    public void testAssignOnEmptyTopicPartition() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+
+        consumer.assign(Collections.<TopicPartition>emptyList());
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
+
+        consumer.close();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAssignOnNullTopicInPartition() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicInPartition");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        try {
+            consumer.assign(Arrays.asList(new TopicPartition(null, 0)));
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAssignOnEmptyTopicInPartition() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnEmptyTopicInPartition");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        try {
+            consumer.assign(Arrays.asList(new TopicPartition("  ", 0)));
+        } finally {
+            consumer.close();
+        }
+    }
+
     @Test
     public void testInterceptorConstructorClose() throws Exception {
         try {


Mime
View raw message