kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede
Date Fri, 09 Jan 2015 19:31:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 f71933ef1 -> f88db16d1


kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions;
patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f88db16d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f88db16d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f88db16d

Branch: refs/heads/0.8.2
Commit: f88db16d15ff8a1883b7aed3c60eefa64faa764c
Parents: f71933e
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Jan 9 11:31:47 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Jan 9 11:31:47 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala            | 6 +++++-
 .../src/test/scala/unit/kafka/server/OffsetCommitTest.scala | 9 ++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f88db16d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2f00992..9a61fcb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -508,7 +508,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
     )
     val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition,
OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
-    val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
+    val knownStatus =
+      if (knownTopicPartitions.size > 0)
+        offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
+      else
+        Map.empty[TopicAndPartition, OffsetMetadataAndError]
     val status = unknownStatus ++ knownStatus
 
     val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88db16d/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 8c5364f..4a3a5b2 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -79,7 +79,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment,
servers = Seq(server))
 
-    val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition
-> OffsetAndMetadata(offset=42L)))
+    val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(offset=42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
     assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
@@ -109,6 +109,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness
{
     assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
 
+    // Fetch an unknown topic and verify
+    val unknownTopicAndPartition = TopicAndPartition("unknownTopic", 0)
+    val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition))
+    val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2)
+
+    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
+    assertEquals(1, fetchResponse2.requestInfo.size)
   }
 
   @Test


Mime
View raw message