kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-763 delta; Add an option to replica from the largest offset during unclean leader election; patched by Swapnil Ghike; reviewed by Jun Rao
Date Wed, 13 Mar 2013 22:11:34 GMT
Updated Branches:
  refs/heads/0.8 0a9283530 -> 3b3fb7fed


kafka-763 delta; Add an option to replica from the largest offset during unclean leader election;
patched by Swapnil Ghike; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b3fb7fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b3fb7fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b3fb7fe

Branch: refs/heads/0.8
Commit: 3b3fb7fed622cc0339c3b9e3fb175e6093e427a3
Parents: 0a92835
Author: Swapnil Ghike <sghike@linkedin.com>
Authored: Wed Mar 13 15:11:15 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Mar 13 15:11:15 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherThread.scala     |    7 +------
 .../main/scala/kafka/consumer/SimpleConsumer.scala |    1 +
 2 files changed, 2 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b3fb7fe/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 1dfc75c..80df1b5 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -58,12 +58,7 @@ class ConsumerFetcherThread(name: String,
       case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
       case _ => startTimestamp = OffsetRequest.LatestTime
     }
-    val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp,
1)))
-    val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
-    val newOffset = partitionErrorAndOffset.error match {
-      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
-      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
-    }
+    val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp,
Request.OrdinaryConsumerId)
     val pti = partitionMap(topicAndPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b3fb7fe/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5a0784a..1fbdfc3 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -141,6 +141,7 @@ class SimpleConsumer(val host: String,
    */
   def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long,
consumerId: Int): Long = {
     val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest,
1)),
+                                clientId = clientId,
                                 replicaId = consumerId)
     val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
     val offset = partitionErrorAndOffset.error match {


Mime
View raw message