kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions
Date Wed, 24 May 2017 06:09:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 44294a894 -> 23de4f4cd


KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions

Before this patch the consumer would return the cached offsets for partitions in its current
assignment. This worked when all the offset commits went through the consumer.

With KIP-98, offsets can be committed transactionally through the producer. This means that
relying on cached positions in the consumer returns incorrect information: since commits go
through the producer, the cache is never updated.

Hence we need to update the `KafkaConsumer.committed` method to always lookup the server for
the last committed offset to ensure it gets the correct information every time.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #3119 from apurvam/KAFKA-5273-kafkaconsumer-committed-should-always-hit-server

(cherry picked from commit 4d89db968257c0c3273ece7e5546cb95ddcbeb46)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23de4f4c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23de4f4c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23de4f4c

Branch: refs/heads/0.11.0
Commit: 23de4f4cdda23181fe0b925dd9feb2444dc06e3a
Parents: 44294a8
Author: Apurva Mehta <apurva@confluent.io>
Authored: Tue May 23 23:08:57 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 23 23:09:06 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java     | 18 +++---------------
 .../kafka/clients/consumer/OffsetAndMetadata.java | 14 ++++++++++----
 .../kafka/clients/consumer/KafkaConsumerTest.java | 10 ++++++++--
 .../kafka/api/TransactionsBounceTest.scala        |  5 ++---
 .../integration/kafka/api/TransactionsTest.scala  |  8 +++-----
 .../test/scala/unit/kafka/utils/TestUtils.scala   | 10 ++++++++++
 6 files changed, 36 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23de4f4c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6bcc086..3c6d409 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1292,8 +1292,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * Get the last committed offset for the given partition (whether the commit happened
by this process or
      * another). This offset will be used as the position for the consumer in the event of
a failure.
      * <p>
-     * This call may block to do a remote call if the partition in question isn't assigned
to this consumer or if the
-     * consumer hasn't yet initialized its cache of committed offsets.
+     * This call will block to do a remote call to get the latest committed offsets from
the server.
      *
      * @param partition The partition to check
      * @return The last committed offset and metadata or null if there was no prior commit
@@ -1309,19 +1308,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public OffsetAndMetadata committed(TopicPartition partition) {
         acquire();
         try {
-            OffsetAndMetadata committed;
-            if (subscriptions.isAssigned(partition)) {
-                committed = this.subscriptions.committed(partition);
-                if (committed == null) {
-                    coordinator.refreshCommittedOffsetsIfNeeded();
-                    committed = this.subscriptions.committed(partition);
-                }
-            } else {
-                Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
-                committed = offsets.get(partition);
-            }
-
-            return committed;
+            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+            return offsets.get(partition);
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23de4f4c/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index 9d06f29..262d8f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+
 import java.io.Serializable;
 
 /**
@@ -34,7 +36,12 @@ public class OffsetAndMetadata implements Serializable {
      */
     public OffsetAndMetadata(long offset, String metadata) {
         this.offset = offset;
-        this.metadata = metadata;
+        // The server converts null metadata to an empty string. So we store it as an empty
string as well on the client
+        // to be consistent.
+        if (metadata == null)
+            this.metadata = OffsetFetchResponse.NO_METADATA;
+        else
+            this.metadata = metadata;
     }
 
     /**
@@ -62,14 +69,13 @@ public class OffsetAndMetadata implements Serializable {
         OffsetAndMetadata that = (OffsetAndMetadata) o;
 
         if (offset != that.offset) return false;
-        return metadata == null ? that.metadata == null : metadata.equals(that.metadata);
-
+        return metadata.equals(that.metadata);
     }
 
     @Override
     public int hashCode() {
         int result = (int) (offset ^ (offset >>> 32));
-        result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
+        result = 31 * result + metadata.hashCode();
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23de4f4c/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 5928a28..9117f71 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -510,10 +510,12 @@ public class KafkaConsumerTest {
         // fetch offset for two topics
         Map<TopicPartition, Long> offsets = new HashMap<>();
         offsets.put(tp0, offset1);
-        offsets.put(tp1, offset2);
         client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
-
         assertEquals(offset1, consumer.committed(tp0).offset());
+
+        offsets.remove(tp0);
+        offsets.put(tp1, offset2);
+        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
         assertEquals(offset2, consumer.committed(tp1).offset());
     }
 
@@ -1149,6 +1151,10 @@ public class KafkaConsumerTest {
 
         client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
         assertEquals(0, consumer.committed(tp0).offset());
+
+        offsets.remove(tp0);
+        offsets.put(tp1, 0L);
+        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
         assertEquals(0, consumer.committed(tp1).offset());
 
         // fetch and verify consumer's position in the two partitions

http://git-wip-us.apache.org/repos/asf/kafka/blob/23de4f4c/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 110e680..5d46348 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -83,7 +83,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
 
     TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers)
 
-    var consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
+    val consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
     val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers)
 
     val scheduler = new BounceScheduler
@@ -110,8 +110,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
         if (shouldAbort) {
           trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
           producer.abortTransaction()
-          consumer.close()
-          consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
+          TestUtils.resetToCommittedPositions(consumer)
         } else {
           trace(s"Committed offsets. committing transaction of ${records.size} messages.")
           producer.commitTransaction()

http://git-wip-us.apache.org/repos/asf/kafka/blob/23de4f4c/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 1832dc2..ec6b3ea 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -116,7 +116,7 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
 
-    var consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages
/ 4)
+    val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages
/ 4)
     consumer.subscribe(List(topic1))
     producer.initTransactions()
 
@@ -145,10 +145,8 @@ class TransactionsTest extends KafkaServerTestHarness {
           producer.abortTransaction()
           debug(s"aborted transaction Last committed record: ${new String(records.last.value(),
"UTF-8")}. Num " +
             s"records written to $topic2: $recordsProcessed")
-          consumer.close()
-          consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages
/ 4)
-          consumer.subscribe(List(topic1))
-        }
+          TestUtils.resetToCommittedPositions(consumer)
+       }
       }
     } finally {
       producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/23de4f4c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5097637..70c340b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1413,6 +1413,16 @@ object TestUtils extends Logging {
     records
   }
 
+  def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = {
+    consumer.assignment.foreach { case(topicPartition) =>
+      val offset = consumer.committed(topicPartition)
+      if (offset != null)
+        consumer.seek(topicPartition, offset.offset)
+      else
+        consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    }
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {


Mime
View raw message