kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5327: ConsoleConsumer should manually commit offsets for records that are returned in receive()
Date Fri, 02 Jun 2017 19:08:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f5aee9bd5 -> 4671a486c


KAFKA-5327: ConsoleConsumer should manually commit offsets for records that are returned in
receive()

KAFKA-5327: ConsoleConsumer should manually commit offsets for those records it really consumed.
Currently it leaves this job to the automatic offset commit scheme where some unread messages
will be passed if `--max-messages` is set.

Author: amethystic <huxi_2b@hotmail.com>
Author: huxi <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3148 from amethystic/KAFKA-5327_ConsoleConsumer_distable_autocommit

(cherry picked from commit d7d1196a0b542adb46d22eeb5b6d12af950b64c9)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4671a486
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4671a486
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4671a486

Branch: refs/heads/0.11.0
Commit: 4671a486cfe2757cd04011f4f969a44e7d37b54a
Parents: f5aee9b
Author: amethystic <huxi_2b@hotmail.com>
Authored: Fri Jun 2 12:08:26 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jun 2 12:08:32 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     | 46 ++++++++++++++------
 .../scala/kafka/tools/ConsoleConsumer.scala     |  3 ++
 2 files changed, 36 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4671a486/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index cec74d0..fd5aa41 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -17,18 +17,22 @@
 
 package kafka.consumer
 
+import java.util
 import java.util.{Collections, Properties}
 import java.util.regex.Pattern
 
 import kafka.api.OffsetRequest
 import kafka.common.StreamEndException
 import kafka.message.Message
+import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata}
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.header.internals.RecordHeaders
 
+import scala.collection.mutable.HashMap
+
 /**
  * A base consumer used to abstract both old and new consumer
  * this class should be removed (along with BaseProducer)
@@ -60,8 +64,13 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int],
offset:
   import org.apache.kafka.clients.consumer.KafkaConsumer
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
+  val offsets = new HashMap[TopicPartition, Long]()
+
   consumerInit()
-  var recordIter = consumer.poll(0).iterator
+  private var currentPartition: TopicPartition = null
+  private var polledRecords = consumer.poll(0)
+  private var partitionIter = polledRecords.partitions.iterator
+  private var recordIter: util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
 
   def consumerInit() {
     (topic, partitionId, offset, whitelist) match {
@@ -93,21 +102,30 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int],
offset:
   }
 
   override def receive(): BaseConsumerRecord = {
-    if (!recordIter.hasNext) {
-      recordIter = consumer.poll(timeoutMs).iterator
-      if (!recordIter.hasNext)
-        throw new ConsumerTimeoutException
+    if (recordIter == null || !recordIter.hasNext) {
+      if (!partitionIter.hasNext) {
+        polledRecords = consumer.poll(timeoutMs)
+        partitionIter = polledRecords.partitions.iterator
+
+        if (!partitionIter.hasNext)
+          throw new ConsumerTimeoutException
+      }
+
+      currentPartition = partitionIter.next
+      recordIter = polledRecords.records(currentPartition).iterator
     }
 
     val record = recordIter.next
+    offsets.put(currentPartition, record.offset + 1)
+
     BaseConsumerRecord(record.topic,
-                       record.partition,
-                       record.offset,
-                       record.timestamp,
-                       record.timestampType,
-                       record.key,
-                       record.value,
-                       record.headers)
+      record.partition,
+      record.offset,
+      record.timestamp,
+      record.timestampType,
+      record.key,
+      record.value,
+      record.headers)
   }
 
   override def stop() {
@@ -119,7 +137,9 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int],
offset:
   }
 
   override def commit() {
-    this.consumer.commitSync()
+    import scala.collection.JavaConverters._
+    consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new OffsetAndMetadata(offset))}.asJava)
+    offsets.clear()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4671a486/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 335c724..a1e2ffa 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -77,6 +77,7 @@ object ConsoleConsumer extends Logging {
     try {
       process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError)
     } finally {
+      consumer.commit()
       consumer.cleanup()
       conf.formatter.close()
       reportRecordCount()
@@ -200,7 +201,9 @@ object ConsoleConsumer extends Logging {
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
+
     props
   }
 


Mime
View raw message