Repository: kafka Updated Branches: refs/heads/0.8.2 5257e9dde -> ede0ec321 Reverting KAFKA-1637 on 0.8.2 since it was only reverted on trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ede0ec32 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ede0ec32 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ede0ec32 Branch: refs/heads/0.8.2 Commit: ede0ec3210cc778da28f30ea458c63d8ee46fc57 Parents: 5257e9d Author: Neha Narkhede Authored: Thu Oct 16 10:36:43 2014 -0700 Committer: Neha Narkhede Committed: Thu Oct 16 10:36:43 2014 -0700 ---------------------------------------------------------------------- .../kafka/common/OffsetMetadataAndError.scala | 1 - core/src/main/scala/kafka/server/KafkaApis.scala | 8 +------- .../unit/kafka/server/OffsetCommitTest.scala | 19 +++---------------- 3 files changed, 4 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ede0ec32/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 4cabffe..1586243 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -51,6 +51,5 @@ object OffsetMetadataAndError { val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError) val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode) val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode) - val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ede0ec32/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6ad64d2..67f2833 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -505,13 +505,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] - // Missing - val (missingTopicPartitions, availableTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => - replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition).isEmpty - ) - val missingStatus = missingTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val availableStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, availableTopicPartitions).toMap - val status = missingStatus ++ availableStatus + val status = offsetManager.getOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) http://git-wip-us.apache.org/repos/asf/kafka/blob/ede0ec32/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364f..2d93250 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -116,13 +116,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val topic1 = "topic-1" val topic2 = "topic-2" val topic3 = "topic-3" - val topic4 = "topic-4" // Topic that group never consumes - val topic5 = "topic-5" // Non-existent topic - - createTopic(zkClient, topic1, servers = Seq(server), numPartitions = 1) - createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 2) - createTopic(zkClient, topic3, servers = Seq(server), numPartitions = 1) - createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1) + val topic4 = "topic-4" val commitRequest = OffsetCommitRequest("test-group", immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"), @@ -142,8 +136,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { TopicAndPartition(topic3, 0), TopicAndPartition(topic2, 1), TopicAndPartition(topic3, 1), // An unknown partition - TopicAndPartition(topic4, 0), // An unused topic - TopicAndPartition(topic5, 0) // An unknown topic + TopicAndPartition(topic4, 0) // An unknown topic )) val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) @@ -151,12 +144,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) - assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata) assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata) @@ -164,7 +153,6 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata) - assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata) assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset) assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset) @@ -172,7 +160,6 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset) assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset) assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset) - assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset) } @Test