kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka regardless of request version
Date Thu, 16 Jul 2015 05:26:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/hotfix [created] a098de48e


Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka regardless of request
version


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a098de48
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a098de48
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a098de48

Branch: refs/heads/hotfix
Commit: a098de48e61bd0e713e88f3429f84cd57b5fb97d
Parents: 9f80665
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Wed Jul 15 22:26:17 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Jul 15 22:26:17 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala        | 12 +++++++-----
 .../test/scala/unit/kafka/server/OffsetCommitTest.scala |  2 +-
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d63bc18..528d759 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -159,7 +159,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId,
response)))
     }
 
-    if (offsetCommitRequest.versionId == 0) {
+    // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go to zookeeper)
+    /*if (offsetCommitRequest.versionId == 0) {
       // for version 0 always store offsets to ZK
       val responseInfo = offsetCommitRequest.requestInfo.map {
         case (topicAndPartition, metaAndError) => {
@@ -181,7 +182,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       sendResponseCallback(responseInfo)
-    } else {
+    } else {*/
       // for version 1 and beyond store offsets in offset manager
 
       // compute the retention time based on the request version:
@@ -222,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         offsetCommitRequest.groupGenerationId,
         offsetData,
         sendResponseCallback)
-    }
+    //}
   }
 
   /**
@@ -473,7 +474,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
 
-    val response = if (offsetFetchRequest.versionId == 0) {
+    // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go to zookeeper)
+    val response = /*if (offsetFetchRequest.versionId == 0) {
       // version 0 reads offsets from ZK
       val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => {
         val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
@@ -495,7 +497,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       })
 
       OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
-    } else {
+    } else */ {
       // version 1 reads offsets from Kafka
       val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition
=>
         metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty

http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 528525b..b4a882b 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -235,7 +235,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
       versionId = 0
     )
     assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
-    assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+    //assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
     // committed offset should exist with fetch version 0
     assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, Seq(TopicAndPartition(topic,
0)), versionId = 0)).requestInfo.get(topicPartition).get.offset)


Mime
View raw message