KAFKA-1038; fetch response should use empty messageset instead of null when handling errors;
patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aebf7461
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aebf7461
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aebf7461
Branch: refs/heads/trunk
Commit: aebf746190685d055358ca122aedc424fe024afa
Parents: 0c1885b
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Sep 13 16:29:39 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Sep 13 16:29:39 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aebf7461/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index a807c1f..fb2a230 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -25,6 +25,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
import java.util.concurrent.atomic.AtomicInteger
import kafka.network.RequestChannel
+import kafka.message.MessageSet
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -155,7 +156,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
val fetchResponsePartitionData = requestInfo.map {
case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, null))
+ (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, MessageSet.Empty))
}
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
|