kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: Revert "KAFKA-1637 SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group; reviewed by Neha Narkhede"
Date Wed, 15 Oct 2014 05:32:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk de432a09e -> 841387b23


Revert "KAFKA-1637 SimpleConsumer.fetchOffset returns wrong error code when no offset exists
for topic/partition/consumer group; reviewed by Neha Narkhede"

This reverts commit de432a09e632f78df9e580b51277f81582c3f026.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/841387b2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/841387b2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/841387b2

Branch: refs/heads/trunk
Commit: 841387b23ac30707dd7f24a79de441c3302458d4
Parents: de432a0
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Tue Oct 14 22:31:50 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Oct 14 22:31:50 2014 -0700

----------------------------------------------------------------------
 .../kafka/common/OffsetMetadataAndError.scala    |  1 -
 core/src/main/scala/kafka/server/KafkaApis.scala |  8 +-------
 .../unit/kafka/server/OffsetCommitTest.scala     | 19 +++----------------
 3 files changed, 4 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/841387b2/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 4cabffe..1586243 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -51,6 +51,5 @@ object OffsetMetadataAndError {
   val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
ErrorMapping.NoError)
   val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
ErrorMapping.OffsetsLoadInProgressCode)
   val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset,
OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
-  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
ErrorMapping.UnknownTopicOrPartitionCode)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/841387b2/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6ad64d2..67f2833 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -505,13 +505,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
 
-    // Missing
-    val (missingTopicPartitions, availableTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition
=>
-      replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition).isEmpty
-    )
-    val missingStatus = missingTopicPartitions.map(topicAndPartition => (topicAndPartition,
OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
-    val availableStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, availableTopicPartitions).toMap
-    val status = missingStatus ++ availableStatus
+    val status = offsetManager.getOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap
 
     val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/841387b2/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 8c5364f..2d93250 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -116,13 +116,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness
{
     val topic1 = "topic-1"
     val topic2 = "topic-2"
     val topic3 = "topic-3"
-    val topic4 = "topic-4" // Topic that group never consumes
-    val topic5 = "topic-5" // Non-existent topic
-
-    createTopic(zkClient, topic1, servers = Seq(server), numPartitions = 1)
-    createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 2)
-    createTopic(zkClient, topic3, servers = Seq(server), numPartitions = 1)
-    createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1)
+    val topic4 = "topic-4"
 
     val commitRequest = OffsetCommitRequest("test-group", immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata
one"),
@@ -142,8 +136,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
       TopicAndPartition(topic3, 0),
       TopicAndPartition(topic2, 1),
       TopicAndPartition(topic3, 1), // An unknown partition
-      TopicAndPartition(topic4, 0), // An unused topic
-      TopicAndPartition(topic5, 0)  // An unknown topic
+      TopicAndPartition(topic4, 0)  // An unknown topic
     ))
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
@@ -151,12 +144,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness
{
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
0)).get.error)
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
0)).get.error)
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
1)).get.error)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get.error)
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get.error)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get.error)
-    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get)
+    assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get)
-    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get)
 
     assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1,
0)).get.metadata)
     assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2,
0)).get.metadata)
@@ -164,7 +153,6 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
1)).get.metadata)
     assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get.metadata)
     assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get.metadata)
-    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get.metadata)
 
     assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
     assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
@@ -172,7 +160,6 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
     assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get.offset)
     assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get.offset)
-    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get.offset)
   }
 
   @Test


Mime
View raw message