kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Update consumer javadoc for invalid operations on unassigned partitions (#5005)
Date Mon, 28 May 2018 19:08:37 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8bf20bb  MINOR: Update consumer javadoc for invalid operations on unassigned partitions
 (#5005)
8bf20bb is described below

commit 8bf20bb58637a59aa816e9d9668741a15fe9b55d
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Tue May 29 00:38:14 2018 +0530

    MINOR: Update consumer javadoc for invalid operations on unassigned partitions  (#5005)
    
    Document cases where  `IllegalStateException` is raised when attempting an invalid operation
on an unassigned partition. Also change `position()` to raise `IllegalStateException` when
called on an unassigned partition for consistency.
---
 .../apache/kafka/clients/consumer/KafkaConsumer.java   | 18 ++++++++++--------
 .../integration/kafka/api/PlaintextConsumerTest.scala  |  2 +-
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e5bc5c1..602c9d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1401,8 +1401,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * is invoked for the same partition more than once, the latest offset will be used on
the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to
reset the fetch offsets
      *
-     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to
this consumer
-     *                                  or if provided offset is negative
+     * @throws IllegalArgumentException if the provided offset is negative
+     * @throws IllegalStateException if the provided TopicPartition is not assigned to this
consumer
      */
     @Override
     public void seek(TopicPartition partition, long offset) {
@@ -1423,7 +1423,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)}
are called.
      * If no partitions are provided, seek to the first offset for all of the currently assigned
partitions.
      *
-     * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided
TopicPartition is not assigned to this consumer
+     * @throws IllegalArgumentException if {@code partitions} is {@code null}
+     * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
      */
     public void seekToBeginning(Collection<TopicPartition> partitions) {
         if (partitions == null)
@@ -1449,7 +1450,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * If {@code isolation.level=read_committed}, the end offset will be the Last Stable
Offset, i.e., the offset
      * of the first message with an open transaction.
      *
-     * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided
TopicPartition is not assigned to this consumer
+     * @throws IllegalArgumentException if {@code partitions} is {@code null}
+     * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
      */
     public void seekToEnd(Collection<TopicPartition> partitions) {
         if (partitions == null)
@@ -1476,7 +1478,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *
      * @param partition The partition to get the position for
      * @return The current position of the consumer (that is, the offset of the next record
to be fetched)
-     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to
this consumer
+     * @throws IllegalStateException if the provided TopicPartition is not assigned to this
consumer
      * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently
defined for
      *             the partition
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this
@@ -1492,7 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         acquireAndEnsureOpen();
         try {
             if (!this.subscriptions.isAssigned(partition))
-                throw new IllegalArgumentException("You can only check the position for partitions
assigned to this consumer.");
+                throw new IllegalStateException("You can only check the position for partitions
assigned to this consumer.");
             Long offset = this.subscriptions.position(partition);
             while (offset == null) {
                 // batch update fetch positions for any partitions without a valid position
@@ -1614,7 +1616,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * Note that this method does not affect partition subscription. In particular, it does
not cause a group
      * rebalance when automatic assignment is used.
      * @param partitions The partitions which should be paused
-     * @throws IllegalStateException if one of the provided partitions is not assigned to
this consumer
+     * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
      */
     @Override
     public void pause(Collection<TopicPartition> partitions) {
@@ -1634,7 +1636,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * {@link #poll(Duration)} will return records from these partitions if there are any
to be fetched.
      * If the partitions were not previously paused, this method is a no-op.
      * @param partitions The partitions which should be resumed
-     * @throws IllegalStateException if one of the provided partitions is not assigned to
this consumer
+     * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
      */
     @Override
     public void resume(Collection<TopicPartition> partitions) {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a06e9e3..372cc3f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -633,7 +633,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertNull(this.consumers.head.committed(new TopicPartition(topic, 15)))
 
     // position() on a partition that we aren't subscribed to throws an exception
-    intercept[IllegalArgumentException] {
+    intercept[IllegalStateException] {
       this.consumers.head.position(new TopicPartition(topic, 15))
     }
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message