This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8bf20bb MINOR: Update consumer javadoc for invalid operations on unassigned partitions
(#5005)
8bf20bb is described below
commit 8bf20bb58637a59aa816e9d9668741a15fe9b55d
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Tue May 29 00:38:14 2018 +0530
MINOR: Update consumer javadoc for invalid operations on unassigned partitions (#5005)
Document cases where `IllegalStateException` is raised when attempting an invalid operation
on an unassigned partition. Also change `position()` to raise `IllegalStateException` when
called on an unassigned partition for consistency.
---
.../apache/kafka/clients/consumer/KafkaConsumer.java | 18 ++++++++++--------
.../integration/kafka/api/PlaintextConsumerTest.scala | 2 +-
2 files changed, 11 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e5bc5c1..602c9d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1401,8 +1401,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
* is invoked for the same partition more than once, the latest offset will be used on
the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to
reset the fetch offsets
*
- * @throws IllegalArgumentException if the provided TopicPartition is not assigned to
this consumer
- * or if provided offset is negative
+ * @throws IllegalArgumentException if the provided offset is negative
+ * @throws IllegalStateException if the provided TopicPartition is not assigned to this
consumer
*/
@Override
public void seek(TopicPartition partition, long offset) {
@@ -1423,7 +1423,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
* first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)}
are called.
* If no partitions are provided, seek to the first offset for all of the currently assigned
partitions.
*
- * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided
TopicPartition is not assigned to this consumer
+ * @throws IllegalArgumentException if {@code partitions} is {@code null}
+ * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
*/
public void seekToBeginning(Collection<TopicPartition> partitions) {
if (partitions == null)
@@ -1449,7 +1450,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
* If {@code isolation.level=read_committed}, the end offset will be the Last Stable
Offset, i.e., the offset
* of the first message with an open transaction.
*
- * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided
TopicPartition is not assigned to this consumer
+ * @throws IllegalArgumentException if {@code partitions} is {@code null}
+ * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
*/
public void seekToEnd(Collection<TopicPartition> partitions) {
if (partitions == null)
@@ -1476,7 +1478,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
*
* @param partition The partition to get the position for
* @return The current position of the consumer (that is, the offset of the next record
to be fetched)
- * @throws IllegalArgumentException if the provided TopicPartition is not assigned to
this consumer
+ * @throws IllegalStateException if the provided TopicPartition is not assigned to this
consumer
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently
defined for
* the partition
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this
@@ -1492,7 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
- throw new IllegalArgumentException("You can only check the position for partitions
assigned to this consumer.");
+ throw new IllegalStateException("You can only check the position for partitions
assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
while (offset == null) {
// batch update fetch positions for any partitions without a valid position
@@ -1614,7 +1616,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
* Note that this method does not affect partition subscription. In particular, it does
not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
- * @throws IllegalStateException if one of the provided partitions is not assigned to
this consumer
+ * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
*/
@Override
public void pause(Collection<TopicPartition> partitions) {
@@ -1634,7 +1636,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
* {@link #poll(Duration)} will return records from these partitions if there are any
to be fetched.
* If the partitions were not previously paused, this method is a no-op.
* @param partitions The partitions which should be resumed
- * @throws IllegalStateException if one of the provided partitions is not assigned to
this consumer
+ * @throws IllegalStateException if any of the provided partitions are not currently
assigned to this consumer
*/
@Override
public void resume(Collection<TopicPartition> partitions) {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a06e9e3..372cc3f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -633,7 +633,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertNull(this.consumers.head.committed(new TopicPartition(topic, 15)))
// position() on a partition that we aren't subscribed to throws an exception
- intercept[IllegalArgumentException] {
+ intercept[IllegalStateException] {
this.consumers.head.position(new TopicPartition(topic, 15))
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.
|