kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2409; have KafkaConsumer.committed return null when there is no commit
Date Fri, 25 Sep 2015 18:04:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe798641a -> 7e453df31


KAFKA-2409; have KafkaConsumer.committed return null when there is no commit

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Onur Karaman, Guozhang Wang

Closes #243 from hachikuji/KAFKA-2409


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7e453df3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7e453df3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7e453df3

Branch: refs/heads/trunk
Commit: 7e453df3179efed92d5bbe4ce9ee1d90a07cd503
Parents: fe79864
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Sep 25 11:08:24 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 25 11:08:24 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 11 +++--------
 .../test/scala/integration/kafka/api/ConsumerTest.scala  |  5 ++---
 .../scala/integration/kafka/api/SSLConsumerTest.scala    |  6 ++----
 3 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7e453df3/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3009f6b..2a3c763 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -775,7 +775,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *
      * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed
partition and no automatic
      *             offset reset policy has been configured.
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
and
+     * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange
error in fetchResponse and
      *         the defaultResetPolicy is NONE
      */
     @Override
@@ -814,7 +814,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * heart-beating, auto-commits, and offset updates.
      * @param timeout The maximum time to block in the underlying poll
      * @return The fetched records (may be empty)
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
and
+     * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange
error in fetchResponse and
      *         the defaultResetPolicy is NONE
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
timeout) {
@@ -1039,9 +1039,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * consumer hasn't yet initialized its cache of committed offsets.
      *
      * @param partition The partition to check
-     * @return The last committed offset
-     * @throws NoOffsetForPartitionException If no offset has ever been committed by any
process for the given
-     *             partition.
+     * @return The last committed offset and metadata or null if there was no prior commit
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1059,9 +1057,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                 committed = offsets.get(partition);
             }
 
-            if (committed == null)
-                throw new NoOffsetForPartitionException("No offset has been committed for
partition " + partition);
-
             return committed;
         } finally {
             release();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7e453df3/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index c59b95b..af81a83 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -188,9 +188,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val pos2 = this.consumers(0).position(tp2)
     this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
-    intercept[NoOffsetForPartitionException] {
-      this.consumers(0).committed(tp2)
-    }
+    assertNull(this.consumers(0).committed(tp2))
+
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))

http://git-wip-us.apache.org/repos/asf/kafka/blob/7e453df3/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
index 65e3d71..b2fc057 100644
--- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -169,10 +169,8 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
   def testPositionAndCommit() {
     sendRecords(5)
 
-    // committed() on a partition with no committed offset throws an exception
-    intercept[NoOffsetForPartitionException] {
-      this.consumers(0).committed(new TopicPartition(topic, 15))
-    }
+    // committed() on a partition with no committed offset returns null
+    assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
 
     // position() on a partition that we aren't subscribed to throws an exception
     intercept[IllegalArgumentException] {


Mime
View raw message