kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6321; Consolidate calls to KafkaConsumer's `beginningOffsets()` and `endOffsets()` in ConsumerGroupCommand
Date Fri, 26 Jan 2018 04:40:34 GMT
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 138f57f  KAFKA-6321; Consolidate calls to KafkaConsumer's `beginningOffsets()` and
`endOffsets()` in ConsumerGroupCommand
138f57f is described below

commit 138f57f16068b49a08a2f58155b71e72ff7ed273
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
AuthorDate: Thu Jan 25 20:40:11 2018 -0800

    KAFKA-6321; Consolidate calls to KafkaConsumer's `beginningOffsets()` and `endOffsets()`
in ConsumerGroupCommand
    
    Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
    
    Reviewers: Ted Yu <yuzhihong@gmail.com>, Jiangjie (Becket) Qin <becket.qin@gmail.com>
    
    Closes #4344 from vahidhashemian/KAFKA-6321
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 237 ++++++++++++---------
 1 file changed, 132 insertions(+), 105 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4905a94..3aa821c 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -287,7 +287,10 @@ object ConsumerGroupCommand extends Logging {
 
     protected def opts: ConsumerGroupCommandOptions
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult =
+      getLogEndOffsets(Seq(topicPartition)).get(topicPartition).getOrElse(LogOffsetResult.Ignore)
+
+    protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult]
 
     def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]])
 
@@ -302,43 +305,40 @@ object ConsumerGroupCommand extends Logging {
                                             consumerIdOpt: Option[String],
                                             hostOpt: Option[String],
                                             clientIdOpt: Option[String]): Array[PartitionAssignmentState]
= {
-      if (topicPartitions.isEmpty)
+      if (topicPartitions.isEmpty) {
         Array[PartitionAssignmentState](
           PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None),
consumerIdOpt, hostOpt, clientIdOpt, None)
         )
-      else {
-        var assignmentRows: Array[PartitionAssignmentState] = Array()
-        topicPartitions
-          .sortBy(_.partition)
-          .foreach { topicPartition =>
-            assignmentRows = assignmentRows :+ describePartition(group, coordinator, topicPartition.topic,
topicPartition.partition, getPartitionOffset(topicPartition),
-              consumerIdOpt, hostOpt, clientIdOpt)
-          }
-        assignmentRows
       }
+      else
+        describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset,
consumerIdOpt, hostOpt, clientIdOpt)
     }
 
     private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] =
       offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
 
-    private def describePartition(group: String,
-                                  coordinator: Option[Node],
-                                  topic: String,
-                                  partition: Int,
-                                  offsetOpt: Option[Long],
-                                  consumerIdOpt: Option[String],
-                                  hostOpt: Option[String],
-                                  clientIdOpt: Option[String]): PartitionAssignmentState
= {
-      def getDescribePartitionResult(logEndOffsetOpt: Option[Long]): PartitionAssignmentState
=
-        PartitionAssignmentState(group, coordinator, Option(topic), Option(partition), offsetOpt,
-                                 getLag(offsetOpt, logEndOffsetOpt), consumerIdOpt, hostOpt,
-                                 clientIdOpt, logEndOffsetOpt)
-
-      getLogEndOffset(new TopicPartition(topic, partition)) match {
-        case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
-        case LogOffsetResult.Unknown => getDescribePartitionResult(None)
-        case LogOffsetResult.Ignore => null
+    private def describePartitions(group: String,
+                                   coordinator: Option[Node],
+                                   topicPartitions: Seq[TopicPartition],
+                                   getPartitionOffset: TopicPartition => Option[Long],
+                                   consumerIdOpt: Option[String],
+                                   hostOpt: Option[String],
+                                   clientIdOpt: Option[String]): Array[PartitionAssignmentState]
= {
+
+      def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]):
PartitionAssignmentState = {
+        val offset = getPartitionOffset(topicPartition)
+        PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition),
offset,
+          getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)
       }
+
+      getLogEndOffsets(topicPartitions).map {
+        logEndOffsetResult =>
+          logEndOffsetResult._2 match {
+            case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1,
Some(logEndOffset))
+            case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1,
None)
+            case LogOffsetResult.Ignore => null
+          }
+      }.toArray
     }
 
     def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
@@ -423,21 +423,23 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match
{
-        case Some(-1) => LogOffsetResult.Unknown
-        case Some(brokerId) =>
-          getZkConsumer(brokerId).map { consumer =>
-            val topicAndPartition = new TopicAndPartition(topicPartition)
-            val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
-            val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-            consumer.close()
-            LogOffsetResult.LogOffset(logEndOffset)
-          }.getOrElse(LogOffsetResult.Ignore)
-        case None =>
-          printError(s"No broker for partition '$topicPartition'")
-          LogOffsetResult.Ignore
-      }
+    protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
+      topicPartitions.map { topicPartition => (topicPartition,
+        zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match
{
+          case Some(-1) => LogOffsetResult.Unknown
+          case Some(brokerId) =>
+            getZkConsumer(brokerId).map { consumer =>
+              val topicAndPartition = new TopicAndPartition(topicPartition)
+              val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
+              val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+              consumer.close()
+              LogOffsetResult.LogOffset(logEndOffset)
+            }.getOrElse(LogOffsetResult.Ignore)
+          case None =>
+            printError(s"No broker for partition '$topicPartition'")
+            LogOffsetResult.Ignore
+        }
+      )}.toMap
     }
 
     private def getPartitionOffsets(group: String,
@@ -596,27 +598,34 @@ object ConsumerGroupCommand extends Logging {
         consumerGroupSummary.state, consumerGroupSummary.consumers.get.size)
     }
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      val offsets = getConsumer.endOffsets(List(topicPartition).asJava)
-      val logStartOffset = offsets.get(topicPartition)
-      LogOffsetResult.LogOffset(logStartOffset)
+    protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
+      val offsets = getConsumer.endOffsets(topicPartitions.asJava)
+      topicPartitions.map { topicPartition =>
+        val logEndOffset = offsets.get(topicPartition)
+        topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+      }.toMap
     }
 
-    protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava)
-      val logStartOffset = offsets.get(topicPartition)
-      LogOffsetResult.LogOffset(logStartOffset)
+    protected def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
+      val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
+      topicPartitions.map { topicPartition =>
+        val logStartOffset = offsets.get(topicPartition)
+        topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+      }.toMap
     }
 
-    protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long):
LogOffsetResult = {
+    protected def getLogTimestampOffsets(topicPartitions: Seq[TopicPartition], timestamp:
java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
       val consumer = getConsumer
-      consumer.assign(List(topicPartition).asJava)
-      val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava)
-      if (offsetsForTimes != null && !offsetsForTimes.isEmpty && offsetsForTimes.get(topicPartition)
!= null)
-        LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset)
-      else {
-        getLogEndOffset(topicPartition)
-      }
+      consumer.assign(topicPartitions.asJava)
+
+      val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
+        consumer.offsetsForTimes(topicPartitions.map(_ -> timestamp).toMap.asJava).asScala.partition(_._2
!= null)
+
+      val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
+        case (topicPartition, offsetAndTimestamp) => topicPartition -> LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
+      }.toMap
+
+      successfulLogTimestampOffsets ++ getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet.toSeq)
     }
 
     def close() {
@@ -703,57 +712,60 @@ object ConsumerGroupCommand extends Logging {
         }.toMap
     }
 
-    private def prepareOffsetsToReset(groupId: String, partitionsToReset: Iterable[TopicPartition]):
Map[TopicPartition, OffsetAndMetadata] = {
+    private def prepareOffsetsToReset(groupId: String, partitionsToReset: Seq[TopicPartition]):
Map[TopicPartition, OffsetAndMetadata] = {
       if (opts.options.has(opts.resetToOffsetOpt)) {
         val offset = opts.options.valueOf(opts.resetToOffsetOpt)
-        partitionsToReset.map {
-          topicPartition =>
-            val newOffset: Long = checkOffsetRange(topicPartition, offset)
-            (topicPartition, new OffsetAndMetadata(newOffset))
-        }.toMap
+        checkOffsetsRange(partitionsToReset.map((_, offset)).toMap).map {
+          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+        }
       } else if (opts.options.has(opts.resetToEarliestOpt)) {
+        val logStartOffsets = getLogStartOffsets(partitionsToReset)
         partitionsToReset.map { topicPartition =>
-          getLogStartOffset(topicPartition) match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+          logStartOffsets.get(topicPartition) match {
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting
offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetToLatestOpt)) {
+        val logEndOffsets = getLogEndOffsets(partitionsToReset)
         partitionsToReset.map { topicPartition =>
-          getLogEndOffset(topicPartition) match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+          logEndOffsets.get(topicPartition) match {
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending
offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetShiftByOpt)) {
         val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
-        partitionsToReset.map { topicPartition =>
+        val requestedOffsets = partitionsToReset.map { topicPartition =>
           val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
           val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
             throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition
since there is no current committed offset"))
-          val shiftedOffset = currentOffset + shiftBy
-          val newOffset: Long = checkOffsetRange(topicPartition, shiftedOffset)
-          (topicPartition, new OffsetAndMetadata(newOffset))
+          (topicPartition, currentOffset + shiftBy)
         }.toMap
+        checkOffsetsRange(requestedOffsets).map {
+          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+        }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
         val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
         partitionsToReset.map { topicPartition =>
-          val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+          val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset
by timestamp of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetByDurationOpt)) {
+        val duration = opts.options.valueOf(opts.resetByDurationOpt)
+        val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
+        val now = new Date()
+        durationParsed.negate().addTo(now)
+        val timestamp = now.getTime
+        val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
         partitionsToReset.map { topicPartition =>
-          val duration = opts.options.valueOf(opts.resetByDurationOpt)
-          val now = new Date()
-          val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
-          durationParsed.negate().addTo(now)
-          val timestamp = now.getTime
-          val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+          val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset
by timestamp of topic partition: $topicPartition")
           }
         }.toMap
@@ -761,40 +773,55 @@ object ConsumerGroupCommand extends Logging {
         val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
         val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
         val resetPlan = parseResetPlan(resetPlanCsv)
-        resetPlan.keySet.map { topicPartition =>
-          val newOffset: Long = checkOffsetRange(topicPartition, resetPlan(topicPartition).offset())
-          (topicPartition, new OffsetAndMetadata(newOffset))
+        val requestedOffsets = resetPlan.keySet.map { topicPartition =>
+          (topicPartition, resetPlan(topicPartition).offset())
         }.toMap
+        checkOffsetsRange(requestedOffsets).map {
+          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+        }
       } else if (opts.options.has(opts.resetToCurrentOpt)) {
         val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
-        partitionsToReset.map { topicPartition =>
-          currentCommittedOffsets.get(topicPartition).map { offset =>
-            (topicPartition, new OffsetAndMetadata(offset))
-          }.getOrElse(
-            getLogEndOffset(topicPartition) match {
-              case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
-              case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting
ending offset of topic partition: $topicPartition")
-            }
-          )
+        val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset)
=
+          partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
+
+        val preparedOffsetsForParititionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map
{ topicPartition =>
+          (topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition)
match {
+            case Some(offset) => offset
+            case _ => throw new IllegalStateException(s"Expected a valid current offset
for topic partition: $topicPartition")
+          }))
         }.toMap
+
+        val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset).map
{
+          case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
+          case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting ending offset of topic partition: $topicPartition")
+        }
+
+        preparedOffsetsForParititionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
       } else {
         CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following
scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
       }
     }
 
-    private def checkOffsetRange(topicPartition: TopicPartition, offset: Long) = {
-      getLogEndOffset(topicPartition) match {
-        case LogOffsetResult.LogOffset(endOffset) if offset > endOffset =>
-          warn(s"New offset ($offset) is higher than latest offset. Value will be set to
$endOffset")
-          endOffset
+    private def checkOffsetsRange(requestedOffsets: Map[TopicPartition, Long]) = {
+      val logStartOffsets = getLogStartOffsets(requestedOffsets.keySet.toSeq)
+      val logEndOffsets = getLogEndOffsets(requestedOffsets.keySet.toSeq)
+      requestedOffsets.map { case (topicPartition, offset) => (topicPartition,
+        logEndOffsets.get(topicPartition) match {
+          case Some(LogOffsetResult.LogOffset(endOffset)) if offset > endOffset =>
+            warn(s"New offset ($offset) is higher than latest offset for topic partition
$topicPartition. Value will be set to $endOffset")
+            endOffset
 
-        case _ => getLogStartOffset(topicPartition) match {
-          case LogOffsetResult.LogOffset(startOffset) if offset < startOffset =>
-            warn(s"New offset ($offset) is lower than earliest offset. Value will be set
to $startOffset")
-            startOffset
+          case Some(_) => logStartOffsets.get(topicPartition) match {
+            case Some(LogOffsetResult.LogOffset(startOffset)) if offset < startOffset
=>
+              warn(s"New offset ($offset) is lower than earliest offset for topic partition
$topicPartition. Value will be set to $startOffset")
+              startOffset
 
-          case _ => offset
-        }
+            case _ => offset
+          }
+
+          case None => // the control should not reach here
+            throw new IllegalStateException(s"Unexpected non-existing offset value for topic
partition $topicPartition")
+        })
       }
     }
 

-- 
To stop receiving notification emails like this one, please contact
jqin@apache.org.

Mime
View raw message