KAFKA-1583; Encapsulate replicated log implementation details into ReplicaManager and refactor KafkaApis; reviewed by Joel Koshy and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89831204 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89831204 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89831204 Branch: refs/heads/trunk Commit: 89831204c092f3a417bf41945925a2e9a0ec828e Parents: 20f5b01 Author: Guozhang Wang Authored: Wed Oct 29 17:11:03 2014 -0700 Committer: Joel Koshy Committed: Wed Oct 29 18:56:23 2014 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/api/FetchRequest.scala | 1 - .../main/scala/kafka/api/FetchResponse.scala | 12 +- .../scala/kafka/api/OffsetCommitRequest.scala | 24 +- .../main/scala/kafka/api/ProducerRequest.scala | 1 - .../main/scala/kafka/api/ProducerResponse.scala | 3 +- .../main/scala/kafka/cluster/Partition.scala | 50 ++- .../main/scala/kafka/common/ErrorMapping.scala | 2 + core/src/main/scala/kafka/log/Log.scala | 26 +- .../kafka/network/BoundedByteBufferSend.scala | 4 +- .../main/scala/kafka/server/DelayedFetch.scala | 125 ++++--- .../scala/kafka/server/DelayedProduce.scala | 135 +++---- .../kafka/server/FetchRequestPurgatory.scala | 69 ---- .../src/main/scala/kafka/server/KafkaApis.scala | 325 ++++++---------- .../main/scala/kafka/server/OffsetManager.scala | 93 ++++- .../kafka/server/ProducerRequestPurgatory.scala | 69 ---- .../scala/kafka/server/ReplicaManager.scala | 370 +++++++++++++------ .../scala/kafka/server/RequestPurgatory.scala | 336 ++++++++--------- .../main/scala/kafka/utils/DelayedItem.scala | 8 +- .../kafka/api/ProducerFailureHandlingTest.scala | 18 +- .../unit/kafka/producer/SyncProducerTest.scala | 4 +- .../server/HighwatermarkPersistenceTest.scala | 7 + .../unit/kafka/server/ISRExpirationTest.scala | 17 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 + .../kafka/server/RequestPurgatoryTest.scala | 103 +++--- .../unit/kafka/server/ServerShutdownTest.scala | 2 +- .../unit/kafka/server/SimpleFetchTest.scala | 270 ++++---------- 26 files changed, 1006 insertions(+), 1074 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 59c0915..b038c15 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -30,7 +30,6 @@ import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) - object FetchRequest { val CurrentVersion = 0.shortValue val DefaultMaxWait = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 8d085a1..75aaf57 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -25,6 +25,8 @@ import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} import kafka.api.ApiUtils._ +import scala.collection._ + object FetchResponsePartitionData { def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { val error = buffer.getShort @@ -150,10 +152,8 @@ object FetchResponse { } } - -case class FetchResponse(correlationId: Int, - data: Map[TopicAndPartition, FetchResponsePartitionData]) - extends RequestOrResponse() { +case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) + extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). @@ -171,8 +171,8 @@ case class FetchResponse(correlationId: Int, /* * FetchResponse uses [sendfile](http://man7.org/linux/man-pages/man2/sendfile.2.html) - * api for data transfer, so `writeTo` aren't actually being used. - * It is implemented as an empty function to comform to `RequestOrResponse.writeTo` + * api for data transfer through the FetchResponseSend, so `writeTo` aren't actually being used. + * It is implemented as an empty function to conform to `RequestOrResponse.writeTo` * abstract method signature. */ def writeTo(buffer: ByteBuffer): Unit = throw new UnsupportedOperationException http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 861a6cf..050615c 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -78,28 +78,12 @@ case class OffsetCommitRequest(groupId: String, groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID, consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID) extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + assert(versionId == 0 || versionId == 1, "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) - def filterLargeMetadata(maxMetadataSize: Int) = - requestInfo.filter(info => info._2.metadata == null || info._2.metadata.length <= maxMetadataSize) - - def responseFor(errorCode: Short, offsetMetadataMaxSize: Int) = { - val commitStatus = requestInfo.map {info => - (info._1, if (info._2.metadata != null && info._2.metadata.length > offsetMetadataMaxSize) - ErrorMapping.OffsetMetadataTooLargeCode - else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode) - ErrorMapping.ConsumerCoordinatorNotAvailableCode - else if (errorCode == ErrorMapping.NotLeaderForPartitionCode) - ErrorMapping.NotCoordinatorForConsumerCode - else - errorCode) - }.toMap - OffsetCommitResponse(commitStatus, correlationId) - } - def writeTo(buffer: ByteBuffer) { // Write envelope buffer.putShort(versionId) @@ -150,8 +134,10 @@ case class OffsetCommitRequest(groupId: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) - val errorResponse = responseFor(errorCode, Int.MaxValue) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + val commitStatus = requestInfo.mapValues(_ => errorCode) + val commitResponse = OffsetCommitResponse(commitStatus, correlationId) + + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index b2366e7..570b2da 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -153,7 +153,6 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, producerRequest.toString() } - def emptyData(){ data.clear() } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/api/ProducerResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index a286272..5d1fac4 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -43,8 +43,7 @@ object ProducerResponse { case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, - status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse() { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e88ecf2..1be5700 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -230,7 +230,33 @@ class Partition(val topic: String, } } - def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) { + /** + * Update the log end offset of a certain replica of this partition + */ + def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) = { + getReplica(replicaId) match { + case Some(replica) => + replica.logEndOffset = offset + + // check if we need to expand ISR to include this replica + // if it is not in the ISR yet + maybeExpandIsr(replicaId) + + debug("Recorded replica %d log end offset (LEO) position %d for partition %s." + .format(replicaId, offset.messageOffset, TopicAndPartition(topic, partitionId))) + case None => + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset.messageOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + } + } + + /** + * Check and maybe expand the ISR of the partition. + * + * This function can be triggered when a replica's LEO has incremented + */ + def maybeExpandIsr(replicaId: Int) { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR leaderReplicaIfLocal() match { @@ -252,7 +278,10 @@ class Partition(val topic: String, updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } + // check if the HW of the partition can now be incremented + // since the replica maybe now be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica) + case None => // nothing to do if no longer leader } } @@ -272,6 +301,7 @@ class Partition(val topic: String, val minIsr = leaderReplica.log.get.config.minInSyncReplicas trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) + if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* * requiredAcks < 0 means acknowledge after all replicas in ISR @@ -298,8 +328,14 @@ class Partition(val topic: String, } /** - * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock - * @param leaderReplica + * Check and maybe increment the high watermark of the partition; + * this function can be triggered when + * + * 1. Partition ISR changed + * 2. Any replica's LEO changed + * + * Note There is no need to acquire the leaderIsrUpdate lock here + * since all callers of this private API acquire that lock */ private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) @@ -310,8 +346,8 @@ class Partition(val topic: String, debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) // some delayed requests may be unblocked after HW changed val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId) - replicaManager.unblockDelayedFetchRequests(requestKey) - replicaManager.unblockDelayedProduceRequests(requestKey) + replicaManager.tryCompleteDelayedFetch(requestKey) + replicaManager.tryCompleteDelayedProduce(requestKey) } else { debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) @@ -362,7 +398,7 @@ class Partition(val topic: String, stuckReplicas ++ slowReplicas } - def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = { + def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = { inReadLock(leaderIsrUpdateLock) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { @@ -379,7 +415,7 @@ class Partition(val topic: String, val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated - replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) + replicaManager.tryCompleteDelayedFetch(new TopicPartitionRequestKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/common/ErrorMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 880ab4a..eedc2f5 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -84,4 +84,6 @@ object ErrorMapping { throw codeToException(code).newInstance() def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance() + + def exceptionNameFor(code: Short) : String = codeToException(code).getName() } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 157d673..37b4a85 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -31,6 +31,21 @@ import scala.collection.JavaConversions import com.yammer.metrics.core.Gauge +object LogAppendInfo { + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false) +} + +/** + * Struct to hold various quantities we compute about each message set before appending to the log + * @param firstOffset The first offset in the message set + * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param validBytes The number of valid bytes + * @param codec The codec used in the message set + * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + */ +case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) + /** * An append-only log for storing messages. @@ -311,17 +326,6 @@ class Log(val dir: File, } /** - * Struct to hold various quantities we compute about each message set before appending to the log - * @param firstOffset The first offset in the message set - * @param lastOffset The last offset in the message set - * @param shallowCount The number of shallow messages - * @param validBytes The number of valid bytes - * @param codec The codec used in the message set - * @param offsetsMonotonic Are the offsets in this message set monotonically increasing - */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) - - /** * Validate the following: *
    *
  1. each message matches its CRC http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index a624359..55ecac2 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -25,7 +25,7 @@ import kafka.api.RequestOrResponse @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { - private var sizeBuffer = ByteBuffer.allocate(4) + private val sizeBuffer = ByteBuffer.allocate(4) // Avoid possibility of overflow for 2GB-4 byte buffer if(buffer.remaining > Int.MaxValue - sizeBuffer.limit) @@ -53,7 +53,7 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() - var written = channel.write(Array(sizeBuffer, buffer)) + val written = channel.write(Array(sizeBuffer, buffer)) // if we are done, mark it off if(!buffer.hasRemaining) complete = true http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e0f14e2..1ccbb4b 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,75 +17,110 @@ package kafka.server -import kafka.network.RequestChannel -import kafka.api.{FetchResponse, FetchRequest} -import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition} +import kafka.api.FetchResponsePartitionData +import kafka.api.PartitionFetchInfo +import kafka.common.UnknownTopicOrPartitionException +import kafka.common.NotLeaderForPartitionException +import kafka.common.TopicAndPartition -import scala.collection.immutable.Map -import scala.collection.Seq +import scala.collection._ + +case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) { + + override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " + + "fetchInfo: " + fetchInfo + "]" +} /** - * A delayed fetch request, which is satisfied (or more - * accurately, unblocked) -- if: - * Case A: This broker is no longer the leader for some partitions it tries to fetch - * - should return whatever data is available for the rest partitions. - * Case B: This broker is does not know of some partitions it tries to fetch - * - should return whatever data is available for the rest partitions. - * Case C: The fetch offset locates not on the last segment of the log - * - should return all the data on that segment. - * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes - * - should return whatever data is available. + * The fetch metadata maintained by the delayed produce request */ +case class FetchMetadata(fetchMinBytes: Int, + fetchOnlyLeader: Boolean, + fetchOnlyCommitted: Boolean, + fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) { + + override def toString = "[minBytes: " + fetchMinBytes + ", " + + "onlyLeader:" + fetchOnlyLeader + ", " + "onlyCommitted: " + fetchOnlyCommitted + ", " + "partitionStatus: " + fetchPartitionStatus + "]" +} -class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], - override val request: RequestChannel.Request, - override val delayMs: Long, - val fetch: FetchRequest, - private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata]) - extends DelayedRequest(keys, request, delayMs) { +/** + * A delayed fetch request that can be created by the replica manager and watched + * in the fetch request purgatory + */ +class DelayedFetch(delayMs: Long, + fetchMetadata: FetchMetadata, + replicaManager: ReplicaManager, + responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + extends DelayedRequest(delayMs) { - def isSatisfied(replicaManager: ReplicaManager) : Boolean = { + /** + * The request can be completed if: + * + * Case A: This broker is no longer the leader for some partitions it tries to fetch + * Case B: This broker does not know of some partitions it tries to fetch + * Case C: The fetch offset locates not on the last segment of the log + * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * + * Upon completion, should return whatever data is available for each valid partition + */ + override def tryComplete() : Boolean = { var accumulatedSize = 0 - val fromFollower = fetch.isFromFollower - partitionFetchOffsets.foreach { - case (topicAndPartition, fetchOffset) => + fetchMetadata.fetchPartitionStatus.foreach { + case (topicAndPartition, fetchStatus) => + val fetchOffset = fetchStatus.startOffsetMetadata try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val endOffset = - if (fromFollower) - replica.logEndOffset - else + if (fetchMetadata.fetchOnlyCommitted) replica.highWatermark + else + replica.logEndOffset if (endOffset.offsetOnOlderSegment(fetchOffset)) { - // Case C, this can happen when the new follower replica fetching on a truncated leader - debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition)) - return true + // Case C, this can happen when the new fetch operation is on a truncated leader + debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition)) + return forceComplete() } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { - // Case C, this can happen when the folloer replica is lagging too much - debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) - return true + // Case C, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata)) + return forceComplete() } else if (fetchOffset.precedes(endOffset)) { - accumulatedSize += endOffset.positionDiff(fetchOffset) + // we need take the partition fetch size as upper bound when accumulating the bytes + accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize) } } } catch { - case utpe: UnknownTopicOrPartitionException => // Case A - debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch)) - return true - case nle: NotLeaderForPartitionException => // Case B - debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch)) - return true + case utpe: UnknownTopicOrPartitionException => // Case B + debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata)) + return forceComplete() + case nle: NotLeaderForPartitionException => // Case A + debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata)) + return forceComplete() } } // Case D - accumulatedSize >= fetch.minBytes + if (accumulatedSize >= fetchMetadata.fetchMinBytes) + forceComplete() + else + false } - def respond(replicaManager: ReplicaManager): FetchResponse = { - val topicData = replicaManager.readMessageSets(fetch) - FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) + /** + * Upon completion, read whatever data is available and pass to the complete callback + */ + override def onComplete() { + val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader, + fetchMetadata.fetchOnlyCommitted, + fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo)) + + val fetchPartitionData = logReadResults.mapValues(result => + FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + + responseCallback(fetchPartitionData) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/DelayedProduce.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 9481508..8049e07 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -17,99 +17,106 @@ package kafka.server -import kafka.api._ + +import kafka.api.ProducerResponseStatus import kafka.common.ErrorMapping import kafka.common.TopicAndPartition -import kafka.utils.Logging -import kafka.network.RequestChannel import scala.Some -import scala.collection.immutable.Map -import scala.collection.Seq - -/** A delayed produce request, which is satisfied (or more - * accurately, unblocked) -- if for every partition it produce to: - * Case A: This broker is not the leader: unblock - should return error. - * Case B: This broker is the leader: - * B.1 - If there was a localError (when writing to the local log): unblock - should return error - * B.2 - else, at least requiredAcks replicas should be caught up to this request. - */ +import scala.collection._ -class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], - override val request: RequestChannel.Request, - override val delayMs: Long, - val produce: ProducerRequest, - val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], - val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) - extends DelayedRequest(keys, request, delayMs) with Logging { +case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { + @volatile var acksPending = false - // first update the acks pending variable according to the error code - partitionStatus foreach { case (topicAndPartition, delayedStatus) => - if (delayedStatus.responseStatus.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when required acks are received - delayedStatus.acksPending = true - delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode - } else { - delayedStatus.acksPending = false - } + override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]" + .format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset) +} - trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) - } +/** + * The produce metadata maintained by the delayed produce request + */ +case class ProduceMetadata(produceRequiredAcks: Short, + produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) { - def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + override def toString = "[requiredAcks: %d, partitionStatus: %s]" + .format(produceRequiredAcks, produceStatus) +} - val errorCode = responseStatus.find { case (_, status) => - status.error != ErrorMapping.NoError - }.map(_._2.error).getOrElse(ErrorMapping.NoError) +/** + * A delayed produce request that can be created by the replica manager and watched + * in the produce request purgatory + */ +class DelayedProduce(delayMs: Long, + produceMetadata: ProduceMetadata, + replicaManager: ReplicaManager, + responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + extends DelayedRequest(delayMs) { - if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + // first update the acks pending variable according to the error code + produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => + if (status.responseStatus.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when required acks are received + status.acksPending = true + status.responseStatus.error = ErrorMapping.RequestTimedOutCode + } else { + status.acksPending = false } - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) - - response + trace("Initial partition status for %s is %s".format(topicAndPartition, status)) } - def isSatisfied(replicaManager: ReplicaManager) = { + /** + * The delayed produce request can be completed if every partition + * it produces to is satisfied by one of the following: + * + * Case A: This broker is no longer the leader: set an error in response + * Case B: This broker is the leader: + * B.1 - If there was a local error thrown while checking if at least requiredAcks + * replicas have caught up to this request: set an error in response + * B.2 - Otherwise, set the response with no error. + */ + override def tryComplete(): Boolean = { // check for each partition if it still has pending acks - partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => - trace("Checking producer request satisfaction for %s, acksPending = %b" - .format(topicAndPartition, fetchPartitionStatus.acksPending)) + produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => + trace("Checking produce satisfaction for %s, current status %s" + .format(topicAndPartition, status)) // skip those partitions that have already been satisfied - if (fetchPartitionStatus.acksPending) { + if (status.acksPending) { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => partition.checkEnoughReplicasReachOffset( - fetchPartitionStatus.requiredOffset, - produce.requiredAcks) + status.requiredOffset, + produceMetadata.produceRequiredAcks) case None => + // Case A (false, ErrorMapping.UnknownTopicOrPartitionCode) } if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.responseStatus.error = errorCode + // Case B.1 + status.acksPending = false + status.responseStatus.error = errorCode } else if (hasEnough) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError + // Case B.2 + status.acksPending = false + status.responseStatus.error = ErrorMapping.NoError } } } - // unblocked if there are no partitions with pending acks - val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - satisfied + // check if each partition has satisfied at lease one of case A and case B + if (! produceMetadata.produceStatus.values.exists(p => p.acksPending)) + forceComplete() + else + false } -} - -case class DelayedProduceResponseStatus(val requiredOffset: Long, - val responseStatus: ProducerResponseStatus) { - @volatile var acksPending = false - override def toString = - "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( - acksPending, responseStatus.error, responseStatus.offset, requiredOffset) + /** + * Upon completion, return the current response status along with the error code per partition + */ + override def onComplete() { + val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) + responseCallback(responseStatus) + } } + http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala deleted file mode 100644 index ed13188..0000000 --- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.network.RequestChannel -import kafka.api.FetchResponseSend - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed fetch requests - */ -class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { - this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "Consumer" - - val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) - private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - - private def recordDelayedFetchExpired(forFollower: Boolean) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - metrics.expiredRequestMeter.mark() - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) - - /** - * When a delayed fetch request expires just answer it with whatever data is present - */ - def expire(delayedFetch: DelayedFetch) { - debug("Expiring fetch request %s.".format(delayedFetch.fetch)) - val fromFollower = delayedFetch.fetch.isFromFollower - recordDelayedFetchExpired(fromFollower) - respond(delayedFetch) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedFetch: DelayedFetch) { - val response = delayedFetch.respond(replicaManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 85498b4..968b0c4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -20,7 +20,6 @@ package kafka.server import kafka.api._ import kafka.common._ import kafka.log._ -import kafka.message._ import kafka.network._ import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response @@ -42,12 +41,8 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel) - val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel) - // TODO: the following line will be removed in 0.9 - replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory) - var metadataCache = new MetadataCache this.logIdent = "[KafkaApi-%d] ".format(brokerId) + val metadataCache = new MetadataCache /** * Top-level method that handles all requests and multiplexes to the right api @@ -56,7 +51,7 @@ class KafkaApis(val requestChannel: RequestChannel, try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { - case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) + case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) @@ -64,7 +59,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) - case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request) + case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) @@ -123,179 +118,87 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) } - private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { - val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { - case (topicAndPartition, offset) => - new Message( - bytes = OffsetManager.offsetCommitValue(offset), - key = OffsetManager.offsetCommitKey(offsetCommitRequest.groupId, topicAndPartition.topic, topicAndPartition.partition) - ) - }.toSeq - - val producerData = mutable.Map( - TopicAndPartition(OffsetManager.OffsetsTopicName, offsetManager.partitionFor(offsetCommitRequest.groupId)) -> - new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, msgs:_*) - ) - - val request = ProducerRequest( - correlationId = offsetCommitRequest.correlationId, - clientId = offsetCommitRequest.clientId, - requiredAcks = config.offsetCommitRequiredAcks, - ackTimeoutMs = config.offsetCommitTimeoutMs, - data = producerData) - trace("Created producer request %s for offset commit request %s.".format(request, offsetCommitRequest)) - request - } /** - * Handle a produce request or offset commit request (which is really a specialized producer request) + * Handle an offset commit request */ - def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { - val (produceRequest, offsetCommitRequestOpt) = - if (request.requestId == RequestKeys.OffsetCommitKey) { - val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) - } else { - (request.requestObj.asInstanceOf[ProducerRequest], None) - } - - val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) - debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - - val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) - - val numPartitionsInError = localProduceResults.count(_.error.isDefined) - if(produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since - // no response is expected by the producer the handler will send a close connection response to the socket server - // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata - if (numPartitionsInError != 0) { - info(("Send the close connection response due to error handling produce request " + - "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") - .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) - requestChannel.closeConnection(request.processor, request) - } else { - - if (firstErrorCode == ErrorMapping.NoError) - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) - - if (offsetCommitRequestOpt.isDefined) { - val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else - requestChannel.noOperation(request.processor, request) - } - } else if (produceRequest.requiredAcks == 1 || - produceRequest.numPartitions <= 0 || - numPartitionsInError == produceRequest.numPartitions) { - - if (firstErrorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + def handleOffsetCommitRequest(request: RequestChannel.Request) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + + // the callback for sending the response + def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { + commitStatus.foreach { case (topicAndPartition, errorCode) => + // we only print warnings for known errors here; only replica manager could see an unknown + // exception while trying to write the offset message to the local log, and it will log + // an error message and write the error code in this case; hence it can be ignored here + if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) { + debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" + .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) + } } - val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap - val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) - + val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else { - // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = produceRequest.data.keys.map( - topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq - val statuses = localProduceResults.map(r => - r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap - val delayedRequest = new DelayedProduce( - producerRequestKeys, - request, - produceRequest.ackTimeoutMs.toLong, - produceRequest, - statuses, - offsetCommitRequestOpt) - - // add the produce request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) - if (satisfiedByMe) - producerRequestPurgatory.respond(delayedRequest) } - // we do not need the data anymore - produceRequest.emptyData() - } - - case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { - def this(key: TopicAndPartition, throwable: Throwable) = - this(key, -1L, -1L, Some(throwable)) - - def errorCode = error match { - case None => ErrorMapping.NoError - case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) - } + // call offset manager to store offsets + offsetManager.storeOffsets( + offsetCommitRequest.groupId, + offsetCommitRequest.consumerId, + offsetCommitRequest.groupGenerationId, + offsetCommitRequest.requestInfo, + sendResponseCallback) } /** - * Helper method for handling a parsed producer request + * Handle a produce request */ - private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean): Iterable[ProduceResult] = { - val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data - trace("Append [%s] to local log ".format(partitionAndData.toString)) - partitionAndData.map {case (topicAndPartition, messages) => - try { - if (Topic.InternalTopics.contains(topicAndPartition.topic) && - !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) { - throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)) - } - val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = partitionOpt match { - case Some(partition) => - partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, brokerId)) + def handleProducerRequest(request: RequestChannel.Request) { + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + + // the callback for sending the response + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + var errorInResponse = false + responseStatus.foreach { case (topicAndPartition, status) => + // we only print warnings for known errors here; if it is unknown, it will cause + // an error message in the replica manager + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" + .format(produceRequest.correlationId, produceRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) + errorInResponse = true } + } - val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) + if (produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" + .format(produceRequest.correlationId, produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } + } else { + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + } - // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + // call the replica manager to append messages to the replicas + replicaManager.appendMessages( + produceRequest.ackTimeoutMs.toLong, + produceRequest.requiredAcks, + produceRequest.data, + sendResponseCallback) - trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) - ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) - } catch { - // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request - // for a partition it is the leader for - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) - Runtime.getRuntime.halt(1) - null - case ite: InvalidTopicException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage)) - new ProduceResult(topicAndPartition, ite) - case utpe: UnknownTopicOrPartitionException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) - new ProduceResult(topicAndPartition, utpe) - case nle: NotLeaderForPartitionException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) - new ProduceResult(topicAndPartition, nle) - case nere: NotEnoughReplicasException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nere.getMessage)) - new ProduceResult(topicAndPartition, nere) - case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" - .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e) - new ProduceResult(topicAndPartition, e) - } - } + // if the request is put into the purgatory, it will have a held reference + // and hence cannot be garbage collected; hence we clear its data here in + // order to let GC re-claim its memory since it is already appended to log + produceRequest.emptyData() } /** @@ -303,59 +206,38 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val dataRead = replicaManager.readMessageSets(fetchRequest) - - // if the fetch request comes from the follower, - // update its corresponding log end offset - if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) - - // check if this fetch request can be satisfied right away - val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum - val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) => - errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError)) - // send the data immediately if 1) fetch request does not want to wait - // 2) fetch request does not require any data - // 3) has enough data to respond - // 4) some error happens while reading data - if(fetchRequest.maxWait <= 0 || - fetchRequest.numPartitions <= 0 || - bytesReadable >= fetchRequest.minBytes || - errorReadingData) { - debug("Returning fetch response %s for fetch request with correlation id %d to client %s" - .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) - } else { - debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, - fetchRequest.clientId)) - // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, - dataRead.mapValues(_.offset)) - - // add the fetch request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) - if (satisfiedByMe) - fetchRequestPurgatory.respond(delayedFetch) - } - } - private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { - debug("Record follower log end offsets: %s ".format(offsets)) - offsets.foreach { - case (topicAndPartition, offset) => - replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic, - topicAndPartition.partition, replicaId, offset) + // the callback for sending the response + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + responsePartitionData.foreach { case (topicAndPartition, data) => + // we only print warnings for known errors here; if it is unknown, it will cause + // an error message in the replica manager already and hence can be ignored here + if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) { + debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s" + .format(fetchRequest.correlationId, fetchRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(data.error))) + } + + // record the bytes out metrics only when the response is being sent + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) + } - // for producer requests with ack > 1, we need to check - // if they can be unblocked after some follower's log end offsets have moved - replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition)) + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } + + // call the replica manager to fetch messages from the local replica + replicaManager.fetchMessages( + fetchRequest.maxWait.toLong, + fetchRequest.replicaId, + fetchRequest.minBytes, + fetchRequest.requestInfo, + sendResponseCallback) } /** - * Service the offset request API + * Handle an offset request */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] @@ -387,15 +269,15 @@ class KafkaApis(val requestChannel: RequestChannel, // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages // are typically transient and there is no value in logging the entire stack trace for the same case utpe: UnknownTopicOrPartitionException => - warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case nle: NotLeaderForPartitionException => - warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case e: Throwable => - warn("Error while responding to offset request", e) + error("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } }) @@ -415,7 +297,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null if(segsArray.last.size > 0) @@ -488,7 +370,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Service the topic metadata request API + * Handle a topic metadata request */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] @@ -500,7 +382,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /* - * Service the Offset fetch API + * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] @@ -520,7 +402,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /* - * Service the consumer metadata API + * Handle a consumer metadata request */ def handleConsumerMetadataRequest(request: RequestChannel.Request) { val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest] @@ -545,9 +427,8 @@ class KafkaApis(val requestChannel: RequestChannel, } def close() { - debug("Shutting down.") - fetchRequestPurgatory.shutdown() - producerRequestPurgatory.shutdown() + // TODO currently closing the API is an no-op since the API no longer maintain any modules + // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer debug("Shut down complete.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 43eb2a3..2957bc4 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.ZkClient +import kafka.api.ProducerResponseStatus /** @@ -144,6 +145,8 @@ class OffsetManager(val config: OffsetManagerConfig, trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) try { + // do not need to require acks since even if the tombsone is lost, + // it will be appended again in the next purge cycle partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) tombstones.size } @@ -192,13 +195,91 @@ class OffsetManager(val config: OffsetManagerConfig, offsetsCache.put(key, offsetAndMetadata) } - def putOffsets(group: String, offsets: Map[TopicAndPartition, OffsetAndMetadata]) { - // this method is called _after_ the offsets have been durably appended to the commit log, so there is no need to - // check for current leadership as we do for the offset fetch - trace("Putting offsets %s for group %s in offsets partition %d.".format(offsets, group, partitionFor(group))) - offsets.foreach { case (topicAndPartition, offsetAndMetadata) => - putOffset(GroupTopicPartition(group, topicAndPartition), offsetAndMetadata) + /* + * Check if the offset metadata length is valid + */ + def validateOffsetMetadataLength(metadata: String) : Boolean = { + metadata == null || metadata.length() <= config.maxMetadataSize + } + + /** + * Store offsets by appending it to the replicated log and then inserting to cache + */ + // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future + def storeOffsets(groupName: String, + consumerId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { + + // first filter out partitions with offset metadata size exceeding limit + // TODO: in the future we may want to only support atomic commit and hence fail the whole commit + val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => + validateOffsetMetadataLength(offsetAndMetadata.metadata) } + + // construct the message set to append + val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + new Message( + key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition), + bytes = OffsetManager.offsetCommitValue(offsetAndMetadata) + ) + }.toSeq + + val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupName)) + + val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> + new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) + + // set the callback function to insert offsets into cache after log append completed + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + // the append response should only contain the topics partition + if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) + throw new IllegalStateException("Append status %s should only have one partition %s" + .format(responseStatus, offsetTopicPartition)) + + // construct the commit response status and insert + // the offset and metadata to cache iff the append status has no error + val status = responseStatus(offsetTopicPartition) + + val responseCode = + if (status.error == ErrorMapping.NoError) { + filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => + putOffset(GroupTopicPartition(groupName, topicAndPartition), offsetAndMetadata) + } + ErrorMapping.NoError + } else { + debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" + .format(filteredOffsetMetadata, groupName, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error))) + + // transform the log append error code to the corresponding the commit status error code + if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) + ErrorMapping.ConsumerCoordinatorNotAvailableCode + else if (status.error == ErrorMapping.NotLeaderForPartitionCode) + ErrorMapping.NotCoordinatorForConsumerCode + else + status.error + } + + + // compute the final error codes for the commit response + val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + (topicAndPartition, responseCode) + else + (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) + } + + // finally trigger the callback logic passed from the API layer + responseCallback(commitStatus) + } + + // call replica manager to append the offset messages + replicaManager.appendMessages( + config.offsetCommitTimeoutMs.toLong, + config.offsetCommitRequiredAcks, + offsetsAndMetadataMessageSet, + putCacheCallback) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala deleted file mode 100644 index d4a7d4a..0000000 --- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.utils.Pool -import kafka.network.{BoundedByteBufferSend, RequestChannel} - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed producer requests - */ -class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { - this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { - val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val producerRequestMetricsForKey = { - val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") - new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) - } - - private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics - - private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) - - /** - * When a delayed produce request expires answer it with possible time out error codes - */ - def expire(delayedProduce: DelayedProduce) { - debug("Expiring produce request %s.".format(delayedProduce.produce)) - for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) - recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) - respond(delayedProduce) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedProduce: DelayedProduce) { - val response = delayedProduce.respond(offsetManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) - } -}