From commits-return-2015-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Oct 30 01:57:05 2014 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBA0E17AF2 for ; Thu, 30 Oct 2014 01:57:05 +0000 (UTC) Received: (qmail 113 invoked by uid 500); 30 Oct 2014 01:57:05 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 99987 invoked by uid 500); 30 Oct 2014 01:57:05 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 99974 invoked by uid 99); 30 Oct 2014 01:57:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Oct 2014 01:57:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3BB6A924054; Thu, 30 Oct 2014 01:57:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jjkoshy@apache.org To: commits@kafka.apache.org Date: Thu, 30 Oct 2014 01:57:05 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] KAFKA-1583; Encapsulate replicated log implementation details into ReplicaManager and refactor KafkaApis; reviewed by Joel Koshy and Jun Rao Repository: kafka Updated Branches: refs/heads/trunk 20f5b01fe -> 89831204c http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 78b7514..02fa382 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,11 +20,11 @@ import kafka.api._ import kafka.common._ import kafka.utils._ import kafka.cluster.{Broker, Partition, Replica} -import kafka.log.LogManager +import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.common.TopicAndPartition -import kafka.message.MessageSet +import kafka.message.{ByteBufferMessageSet, MessageSet} import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} @@ -39,14 +39,31 @@ import scala.Some import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge -object ReplicaManager { - val HighWatermarkFilename = "replication-offset-checkpoint" +/* + * Result metadata of a log append operation on the log + */ +case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) { + def errorCode = error match { + case None => ErrorMapping.NoError + case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } } -case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata) +/* + * Result metadata of a log read operation on the log + */ +case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, error: Option[Throwable] = None) { + def errorCode = error match { + case None => ErrorMapping.NoError + case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } +} +object ReplicaManager { + val HighWatermarkFilename = "replication-offset-checkpoint" +} -class ReplicaManager(val config: KafkaConfig, +class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, scheduler: Scheduler, @@ -64,8 +81,9 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - var producerRequestPurgatory: ProducerRequestPurgatory = null - var fetchRequestPurgatory: FetchRequestPurgatory = null + val producerRequestPurgatory = new RequestPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests) + val fetchRequestPurgatory = new RequestPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + newGauge( "LeaderCount", @@ -100,37 +118,27 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Initialize the replica manager with the request purgatory + * Try to complete some delayed produce requests with the request key; + * this can be triggered when: * - * TODO: will be removed in 0.9 where we refactor server structure - */ - - def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { - this.producerRequestPurgatory = producerRequestPurgatory - this.fetchRequestPurgatory = fetchRequestPurgatory - } - - /** - * Unblock some delayed produce requests with the request key + * 1. The partition HW has changed (for acks = -1) + * 2. A follower replica's fetch operation is received (for acks > 1) */ - def unblockDelayedProduceRequests(key: DelayedRequestKey) { - val satisfied = producerRequestPurgatory.update(key) - debug("Request key %s unblocked %d producer requests." - .format(key.keyLabel, satisfied.size)) - - // send any newly unblocked responses - satisfied.foreach(producerRequestPurgatory.respond(_)) + def tryCompleteDelayedProduce(key: DelayedRequestKey) { + val completed = producerRequestPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) } /** - * Unblock some delayed fetch requests with the request key + * Try to complete some delayed fetch requests with the request key; + * this can be triggered when: + * + * 1. The partition HW has changed (for regular fetch) + * 2. A new message set is appended to the local log (for follower fetch) */ - def unblockDelayedFetchRequests(key: DelayedRequestKey) { - val satisfied = fetchRequestPurgatory.update(key) - debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size)) - - // send any newly unblocked responses - satisfied.foreach(fetchRequestPurgatory.respond(_)) + def tryCompleteDelayedFetch(key: DelayedRequestKey) { + val completed = fetchRequestPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) } def startup() { @@ -237,74 +245,205 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Read from all the offset details given and return a map of - * (topic, partition) -> PartitionData + * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; + * the callback function will be triggered either when timeout or the required acks are satisfied */ - def readMessageSets(fetchRequest: FetchRequest) = { - val isFetchFromFollower = fetchRequest.isFromFollower - fetchRequest.requestInfo.map - { - case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => - val partitionDataAndOffsetInfo = - try { - val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) - if (isFetchFromFollower) { - debug("Partition [%s,%d] received fetch request from follower %d" - .format(topic, partition, fetchRequest.replicaId)) - } - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset) - } catch { - // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request - // for a partition it is the leader for - case utpe: UnknownTopicOrPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - case nle: NotLeaderForPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" - .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - } - (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + def appendMessages(timeout: Long, + requiredAcks: Short, + messagesPerPartition: Map[TopicAndPartition, MessageSet], + responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + + val sTime = SystemTime.milliseconds + val localProduceResults = appendToLocalLog(messagesPerPartition, requiredAcks) + debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + + val produceStatus = localProduceResults.map{ case (topicAndPartition, result) => + topicAndPartition -> + ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status + } + + if(requiredAcks == 0 || + requiredAcks == 1 || + messagesPerPartition.size <= 0 || + localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) { + // in case of the following we can respond immediately: + // + // 1. required acks = 0 or 1 + // 2. there is no data to append + // 3. all partition appends have failed + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + responseCallback(produceResponseStatus) + } else { + // create delayed produce operation + val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionRequestKey(_)).toSeq + + // try to complete the request immediately, otherwise put it into the purgatory + // this is because while the delayed request is being created, new requests may + // arrive which can make this request completable. + producerRequestPurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + } + } + + /** + * Append the messages to the local replica logs + */ + private def appendToLocalLog(messagesPerPartition: Map[TopicAndPartition, MessageSet], + requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = { + trace("Append [%s] to local log ".format(messagesPerPartition)) + messagesPerPartition.map { case (topicAndPartition, messages) => + try { + val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition) + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, localBrokerId)) + } + + val numAppendedMessages = + if (info.firstOffset == -1L || info.lastOffset == -1L) + 0 + else + info.lastOffset - info.firstOffset + 1 + + // update stats for successfully appended bytes and messages as bytesInRate and messageInRate + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + + trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) + (topicAndPartition, LogAppendResult(info)) + } catch { + // NOTE: Failed produce requests metric is not incremented for known exceptions + // it is supposed to indicate un-expected failures of a broker in handling a produce request + case e: KafkaStorageException => + fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) + Runtime.getRuntime.halt(1) + (topicAndPartition, null) + case utpe: UnknownTopicOrPartitionException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) + case nle: NotLeaderForPartitionException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) + case e: Throwable => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + error("Error processing append operation on partition %s".format(topicAndPartition), e) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + } + } + } + + /** + * Fetch messages from the leader replica, and wait until enough data can be fetched and return; + * the callback function will be triggered either when timeout or required fetch info is satisfied + */ + def fetchMessages(timeout: Long, + replicaId: Int, + fetchMinBytes: Int, + fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], + responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + + val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId + val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) + + // read from local logs + val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo) + + // if the fetch comes from the follower, + // update its corresponding log end offset + if(Request.isValidBrokerId(replicaId)) + updateFollowerLEOs(replicaId, logReadResults.mapValues(_.info.fetchOffset)) + + // check if this fetch request can be satisfied right away + val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum + val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) => + errorIncurred || (readResult.errorCode != ErrorMapping.NoError)) + + // respond immediately if 1) fetch request does not want to wait + // 2) fetch request does not require any data + // 3) has enough data to respond + // 4) some error happens while reading data + if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { + val fetchPartitionData = logReadResults.mapValues(result => + FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + responseCallback(fetchPartitionData) + } else { + // construct the fetch results from the read results + val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => + (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get)) + } + val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus) + val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionRequestKey(_)).toSeq + + // try to complete the request immediately, otherwise put it into the purgatory; + // this is because while the delayed request is being created, new requests may + // arrive which can make this request completable. + fetchRequestPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } } /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, - partition: Int, - offset: Long, - maxSize: Int, - fromReplicaId: Int): (FetchDataInfo, Long) = { - // check if the current broker is the leader for the partitions - val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) - getReplicaOrException(topic, partition) - else - getLeaderReplicaIfLocal(topic, partition) - trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = - if (Request.isValidBrokerId(fromReplicaId)) - None - else - Some(localReplica.highWatermark.messageOffset) - val fetchInfo = localReplica.log match { - case Some(log) => - log.read(offset, maxSize, maxOffsetOpt) - case None => - error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) - FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + def readFromLocalLog(fetchOnlyFromLeader: Boolean, + readOnlyCommitted: Boolean, + readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = { + + readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionDataAndOffsetInfo = + try { + trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize)) + + // decide whether to only fetch from leader + val localReplica = if (fetchOnlyFromLeader) + getLeaderReplicaIfLocal(topic, partition) + else + getReplicaOrException(topic, partition) + + // decide whether to only fetch committed data (i.e. messages below high watermark) + val maxOffsetOpt = if (readOnlyCommitted) + Some(localReplica.highWatermark.messageOffset) + else + None + + // read on log + val logReadInfo = localReplica.log match { + case Some(log) => + log.read(offset, fetchSize, maxOffsetOpt) + case None => + error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + + LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, None) + } catch { + // NOTE: Failed fetch requests metric is not incremented for known exceptions since it + // is supposed to indicate un-expected failure of a broker in handling a fetch request + case utpe: UnknownTopicOrPartitionException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(utpe)) + case nle: NotLeaderForPartitionException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle)) + case rnae: ReplicaNotAvailableException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae)) + case e: Throwable => + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() + error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(e)) + } + (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) } - (fetchInfo, localReplica.highWatermark.messageOffset) } def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { @@ -557,32 +696,27 @@ class ReplicaManager(val config: KafkaConfig, allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } - def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = { - getPartition(topic, partitionId) match { - case Some(partition) => - partition.getReplica(replicaId) match { - case Some(replica) => - replica.logEndOffset = offset - // check if we need to update HW and expand Isr - partition.updateLeaderHWAndMaybeExpandIsr(replicaId) - debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId)) - case None => - throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + - " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) - - } - case None => - warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + private def updateFollowerLEOs(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { + debug("Recording follower broker %d log end offsets: %s ".format(replicaId, offsets)) + offsets.foreach { case (topicAndPartition, offset) => + getPartition(topicAndPartition.topic, topicAndPartition.partition) match { + case Some(partition) => + partition.updateReplicaLEO(replicaId, offset) + + // for producer requests with ack > 1, we need to check + // if they can be unblocked after some follower's log end offsets have moved + tryCompleteDelayedProduce(new TopicPartitionRequestKey(topicAndPartition)) + case None => + warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition)) + } } } private def getLeaderPartitions() : List[Partition] = { allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList } - /** - * Flushes the highwatermark value for all partitions to the highwatermark file - */ + + // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) @@ -598,10 +732,14 @@ class ReplicaManager(val config: KafkaConfig, } } - def shutdown() { - info("Shut down") + // High watermark do not need to be checkpointed only when under unit tests + def shutdown(checkpointHW: Boolean = true) { + info("Shutting down") replicaFetcherManager.shutdown() - checkpointHighWatermarks() + fetchRequestPurgatory.shutdown() + producerRequestPurgatory.shutdown() + if (checkpointHW) + checkpointHighWatermarks() info("Shut down completely") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/server/RequestPurgatory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 9d76234..323b12e 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup @@ -30,50 +29,75 @@ import com.yammer.metrics.core.Gauge /** - * A request whose processing needs to be delayed for at most the given delayMs - * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied, - * for example a key could be a (topic, partition) pair. + * An operation whose processing needs to be delayed for at most the given delayMs. For example + * a delayed produce operation could be waiting for specified number of acks; or + * a delayed fetch operation could be waiting for a given number of bytes to accumulate. + * + * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once. + * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either + * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed, + * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls + * forceComplete(). + * + * A subclass of DelayedRequest needs to provide an implementation of both onComplete() and tryComplete(). */ -class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { - val satisfied = new AtomicBoolean(false) +abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { + private val completed = new AtomicBoolean(false) + + /* + * Force completing the delayed operation, if not already completed. + * This function can be triggered when + * + * 1. The operation has been verified to be completable inside tryComplete() + * 2. The operation has expired and hence needs to be completed right now + * + * Return true iff the operation is completed by the caller + */ + def forceComplete(): Boolean = { + if (completed.compareAndSet(false, true)) { + onComplete() + true + } else { + false + } + } + + /** + * Check if the delayed operation is already completed + */ + def isCompleted(): Boolean = completed.get() + + /** + * Process for completing an operation; This function needs to be defined in subclasses + * and will be called exactly once in forceComplete() + */ + def onComplete(): Unit + + /* + * Try to complete the delayed operation by first checking if the operation + * can be completed by now. If yes execute the completion logic by calling + * forceComplete() and return true iff forceComplete returns true; otherwise return false + * + * Note that concurrent threads can check if an operation can be completed or not, + * but only the first thread will succeed in completing the operation and return + * true, others will still return false + * + * this function needs to be defined in subclasses + */ + def tryComplete(): Boolean } /** - * A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay - * and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given - * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition) - * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request - * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting). - * - * For us the key is generally a (topic, partition) pair. - * By calling - * val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest) - * we will check if a request is satisfied already, and if not add the request for watch on all its keys. - * - * It is up to the user to then call - * val satisfied = update(key, request) - * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this - * new request. - * - * An implementation provides extends two helper functions - * def checkSatisfied(request: R, delayed: T): Boolean - * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed - * request delayed. This method will likely also need to do whatever bookkeeping is necessary. - * - * The second function is - * def expire(delayed: T) - * this function handles delayed requests that have hit their time limit without being satisfied. - * + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ -abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) +class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) /* background thread expiring requests that have been waiting too long */ - private val expiredRequestReaper = new ExpiredRequestReaper - private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) + private val expirationReaper = new ExpiredOperationReaper newGauge( "PurgatorySize", @@ -89,230 +113,182 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt } ) - expirationThread.start() + expirationReaper.start() /** - * Try to add the request for watch on all keys. Return true iff the request is - * satisfied and the satisfaction is done by the caller. + * Check if the operation can be completed, if not watch it based on the given watch keys + * + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all of the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. * - * Requests can be watched on only a few of the keys if it is found satisfied when - * trying to add it to each one of the keys. In this case the request is still treated as satisfied - * and hence no longer watched. Those already added elements will be later purged by the expire reaper. + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed by the caller */ - def checkAndMaybeWatch(delayedRequest: T): Boolean = { - for(key <- delayedRequest.keys) { - val lst = watchersFor(key) - if(!lst.checkAndMaybeAdd(delayedRequest)) { - if(delayedRequest.satisfied.compareAndSet(false, true)) - return true - else - return false + def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { + for(key <- watchKeys) { + // if the operation is already completed, stopping adding it to + // any further lists and return false + if (operation.isCompleted()) + return false + val watchers = watchersFor(key) + // if the operation can by completed by myself, stop adding it to + // any further lists and return true immediately + if(operation synchronized operation.tryComplete()) { + return true + } else { + watchers.watch(operation) } } - // if it is indeed watched, add to the expire queue also - expiredRequestReaper.enqueue(delayedRequest) + // if it cannot be completed by now and hence is watched, add to the expire queue also + if (! operation.isCompleted()) { + expirationReaper.enqueue(operation) + } false } /** - * Update any watchers and return a list of newly satisfied requests. + * Check if some some delayed requests can be completed with the given watch key, + * and if yes complete them. + * + * @return the number of completed requests during this process */ - def update(key: Any): Seq[T] = { - val w = watchersForKey.get(key) - if(w == null) - Seq.empty + def checkAndComplete(key: Any): Int = { + val watchers = watchersForKey.get(key) + if(watchers == null) + 0 else - w.collectSatisfiedRequests() + watchers.tryCompleteWatched() } - /* - * Return the size of the watched lists in the purgatory, which is the size of watch lists. - * Since an operation may still be in the watch lists even when it has been completed, - * this number may be larger than the number of real operations watched + /** + * Return the total size of watch lists the purgatory. Since an operation may be watched + * on multiple lists, and some of its watched entries may still be in the watch lists + * even when it has been completed, this number may be larger than the number of real operations watched */ def watched() = watchersForKey.values.map(_.watched).sum - /* - * Return the number of requests in the expiry reaper's queue + /** + * Return the number of delayed operations in the expiry queue */ - def delayed() = expiredRequestReaper.delayed() + def delayed() = expirationReaper.delayed /* - * Return the watch list for the given watch key + * Return the watch list of the given key */ private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) - - /** - * Check if this delayed request is already satisfied - */ - protected def checkSatisfied(request: T): Boolean - - /** - * Handle an expired delayed request - */ - protected def expire(delayed: T) /** * Shutdown the expire reaper thread */ def shutdown() { - expiredRequestReaper.shutdown() + expirationReaper.shutdown() } /** - * A linked list of DelayedRequests watching some key with some associated - * bookkeeping logic. + * A linked list of watched delayed operations based on some key */ private class Watchers { private val requests = new util.LinkedList[T] - // return the size of the watch list - def watched() = requests.size() + def watched = requests.size() - // potentially add the element to watch if it is not satisfied yet - def checkAndMaybeAdd(t: T): Boolean = { + // add the element to watch + def watch(t: T) { synchronized { - // if it is already satisfied, do not add to the watch list - if (t.satisfied.get) - return false - // synchronize on the delayed request to avoid any race condition - // with expire and update threads on client-side. - if(t synchronized checkSatisfied(t)) { - return false - } requests.add(t) - return true } } - // traverse the list and purge satisfied elements - def purgeSatisfied(): Int = { + // traverse the list and try to complete some watched elements + def tryCompleteWatched(): Int = { + var completed = 0 synchronized { val iter = requests.iterator() - var purged = 0 while(iter.hasNext) { val curr = iter.next - if(curr.satisfied.get()) { + if (curr.isCompleted()) { + // another thread has completed this request, just remove it iter.remove() - purged += 1 + } else { + if(curr synchronized curr.tryComplete()) { + iter.remove() + completed += 1 + } } } - purged } + completed } - // traverse the list and try to satisfy watched elements - def collectSatisfiedRequests(): Seq[T] = { - val response = new mutable.ArrayBuffer[T] + // traverse the list and purge elements that are already completed by others + def purgeCompleted(): Int = { + var purged = 0 synchronized { val iter = requests.iterator() - while(iter.hasNext) { + while (iter.hasNext) { val curr = iter.next - if(curr.satisfied.get) { - // another thread has satisfied this request, remove it + if(curr.isCompleted()) { iter.remove() - } else { - // synchronize on curr to avoid any race condition with expire - // on client-side. - val satisfied = curr synchronized checkSatisfied(curr) - if(satisfied) { - iter.remove() - val updated = curr.satisfied.compareAndSet(false, true) - if(updated == true) { - response += curr - } - } + purged += 1 } } } - response + purged } } /** - * Runnable to expire requests that have sat unfullfilled past their deadline + * A background reaper to expire delayed operations that have timed out */ - private class ExpiredRequestReaper extends Runnable with Logging { - this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId) - private val running = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) + private class ExpiredOperationReaper extends ShutdownableThread( + "ExpirationReaper-%d".format(brokerId), + false) { + /* The queue storing all delayed operations */ private val delayedQueue = new DelayQueue[T] + /* + * Return the number of delayed operations kept by the reaper + */ def delayed() = delayedQueue.size() - - /** Main loop for the expiry thread */ - def run() { - while(running.get) { - try { - val curr = pollExpired() - if (curr != null) { - curr synchronized { - expire(curr) - } - } - // see if we need to purge the watch lists - if (RequestPurgatory.this.watched() >= purgeInterval) { - debug("Begin purging watch lists") - val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers)) - } - // see if we need to purge the delayed request queue - if (delayed() >= purgeInterval) { - debug("Begin purging delayed queue") - val purged = purgeSatisfied() - debug("Purged %d requests from delayed queue.".format(purged)) - } - } catch { - case e: Exception => - error("Error in long poll expiry thread: ", e) - } - } - shutdownLatch.countDown() - } - /** Add a request to be expired */ + /* + * Add an operation to be expired + */ def enqueue(t: T) { delayedQueue.add(t) } - /** Shutdown the expiry thread*/ - def shutdown() { - debug("Shutting down.") - running.set(false) - shutdownLatch.await() - debug("Shut down complete.") - } - /** - * Get the next expired event + * Try to get the next expired event and force completing it */ - private def pollExpired(): T = { - while(true) { - val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) - if (curr == null) - return null.asInstanceOf[T] - val updated = curr.satisfied.compareAndSet(false, true) - if(updated) { - return curr + private def expireNext() { + val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) + if (curr != null.asInstanceOf[T]) { + // if there is an expired operation, try to force complete it + if (curr synchronized curr.forceComplete()) { + debug("Force complete expired delayed operation %s".format(curr)) } } - throw new RuntimeException("This should not happen") } /** * Delete all satisfied events from the delay queue and the watcher lists */ - private def purgeSatisfied(): Int = { + private def purgeCompleted(): Int = { var purged = 0 // purge the delayed queue val iter = delayedQueue.iterator() - while(iter.hasNext) { + while (iter.hasNext) { val curr = iter.next() - if(curr.satisfied.get) { + if (curr.isCompleted()) { iter.remove() purged += 1 } @@ -320,6 +296,22 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt purged } - } + override def doWork() { + // try to get the next expired operation and force completing it + expireNext() + // see if we need to purge the watch lists + if (RequestPurgatory.this.watched() >= purgeInterval) { + debug("Begin purging watch lists") + val purged = watchersForKey.values.map(_.purgeCompleted()).sum + debug("Purged %d elements from watch lists.".format(purged)) + } + // see if we need to purge the delayed request queue + if (delayed() >= purgeInterval) { + debug("Begin purging delayed queue") + val purged = purgeCompleted() + debug("Purged %d operations from delayed queue.".format(purged)) + } + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/main/scala/kafka/utils/DelayedItem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index d727649..a4e0dab 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -20,7 +20,7 @@ package kafka.utils import java.util.concurrent._ import scala.math._ -class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed with Logging { +class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging { val createdMs = SystemTime.milliseconds val delayMs = { @@ -29,8 +29,8 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w else given } - def this(item: T, delayMs: Long) = - this(item, delayMs, TimeUnit.MILLISECONDS) + def this(delayMs: Long) = + this(delayMs, TimeUnit.MILLISECONDS) /** * The remaining delay time @@ -41,7 +41,7 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w } def compareTo(d: Delayed): Int = { - val delayed = d.asInstanceOf[DelayedItem[T]] + val delayed = d.asInstanceOf[DelayedItem] val myEnd = createdMs + delayMs val yourEnd = delayed.createdMs + delayed.delayMs http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 209a409..8531f53 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -305,13 +305,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes @Test def testNotEnoughReplicas() { val topicName = "minisrtest" - val topicProps = new Properties(); - topicProps.put("min.insync.replicas","3"); - + val topicProps = new Properties() + topicProps.put("min.insync.replicas","3") TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) try { producer3.send(record).get @@ -327,18 +325,16 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes @Test def testNotEnoughReplicasAfterBrokerShutdown() { val topicName = "minisrtest2" - val topicProps = new Properties(); - topicProps.put("min.insync.replicas","2"); - + val topicProps = new Properties() + topicProps.put("min.insync.replicas","2") TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) - // This should work + // this should work with all brokers up and running producer3.send(record).get - //shut down one broker + // shut down one broker servers.head.shutdown() servers.head.awaitShutdown() try { @@ -351,8 +347,8 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes } } + // restart the server servers.head.startup() - } private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index fb61d55..d60d8e0 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -237,8 +237,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("request.required.acks", "-1") val producer = new SyncProducer(new SyncProducerConfig(props)) - val topicProps = new Properties(); - topicProps.put("min.insync.replicas","2"); + val topicProps = new Properties() + topicProps.put("min.insync.replicas","2") AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 03a424d..8913fc1 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -74,6 +74,9 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) EasyMock.verify(zkClient) + + // shutdown the replica manager upon test completion + replicaManager.shutdown(false) } def testHighWatermarkPersistenceMultiplePartitions() { @@ -130,6 +133,10 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(10L, topic1Partition0Hw) EasyMock.verify(zkClient) + + // shutdown the replica manager upon test completion + replicaManager.shutdown(false) + } def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index cd302aa..a703d27 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -36,8 +36,21 @@ class IsrExpirationTest extends JUnit3Suite { }) val topic = "foo" + val time = new MockTime + + var replicaManager: ReplicaManager = null + + override def setUp() { + super.setUp() + replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false)) + } + + override def tearDown() { + replicaManager.shutdown(false) + super.tearDown() + } + def testIsrExpirationForStuckFollowers() { - val time = new MockTime val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L // create one partition and all replicas @@ -61,7 +74,6 @@ class IsrExpirationTest extends JUnit3Suite { } def testIsrExpirationForSlowFollowers() { - val time = new MockTime // create leader replica val log = getLogWithLogEndOffset(15L, 1) // add one partition @@ -82,7 +94,6 @@ class IsrExpirationTest extends JUnit3Suite { private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig, localLog: Log): Partition = { val leaderId=config.brokerId - val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false)) val partition = replicaManager.getOrCreatePartition(topic, partitionId) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a9c4ddc..faa9071 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -42,6 +42,9 @@ class ReplicaManagerTest extends JUnit3Suite { val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + // shutdown the replica manager upon test completion + rm.shutdown(false) } @Test @@ -56,5 +59,8 @@ class ReplicaManagerTest extends JUnit3Suite { val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + // shutdown the replica manager upon test completion + rm.shutdown(false) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index a577f4a..a7720d5 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -17,24 +17,18 @@ package kafka.server -import scala.collection._ import org.junit.Test +import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -import kafka.message._ -import kafka.api._ import kafka.utils.TestUtils -import org.scalatest.junit.JUnit3Suite - class RequestPurgatoryTest extends JUnit3Suite { - val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) - val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) - var purgatory: MockRequestPurgatory = null + var purgatory: RequestPurgatory[MockDelayedRequest] = null override def setUp() { super.setUp() - purgatory = new MockRequestPurgatory(5) + purgatory = new RequestPurgatory[MockDelayedRequest](0, 5) } override def tearDown() { @@ -44,58 +38,59 @@ class RequestPurgatoryTest extends JUnit3Suite { @Test def testRequestSatisfaction() { - val r1 = new DelayedRequest(Array("test1"), null, 100000L) - val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) - purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) - assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) - purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) - assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) + val r1 = new MockDelayedRequest(100000L) + val r2 = new MockDelayedRequest(100000L) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1")) + assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) + assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1")) + assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) + assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2")) + r1.completable = true + assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1")) + assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1")) + r2.completable = true + assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2")) + assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2")) } @Test def testRequestExpiry() { val expiration = 20L - val r1 = new DelayedRequest(Array("test1"), null, expiration) - val r2 = new DelayedRequest(Array("test1"), null, 200000L) + val r1 = new MockDelayedRequest(expiration) + val r2 = new MockDelayedRequest(200000L) val start = System.currentTimeMillis - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - purgatory.awaitExpiration(r1) + assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) + assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) + r1.awaitExpiration() val elapsed = System.currentTimeMillis - start - assertTrue("r1 expired", purgatory.expired.contains(r1)) - assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertFalse("r2 hasn't completed", r2.isCompleted()) assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } @Test def testRequestPurge() { - val r1 = new DelayedRequest(Array("test1"), null, 100000L) - val r12 = new DelayedRequest(Array("test1", "test2"), null, 100000L) - val r23 = new DelayedRequest(Array("test2", "test3"), null, 100000L) - purgatory.checkAndMaybeWatch(r1) - purgatory.checkAndMaybeWatch(r12) - purgatory.checkAndMaybeWatch(r23) + val r1 = new MockDelayedRequest(100000L) + val r2 = new MockDelayedRequest(100000L) + purgatory.tryCompleteElseWatch(r1, Array("test1")) + purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) + purgatory.tryCompleteElseWatch(r1, Array("test2", "test3")) assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed()) - // satisfy one of the requests, it should then be purged from the watch list with purge interval 5 - r12.satisfied.set(true) + // complete one of the operations, it should + // eventually be purged from the watch list with purge interval 5 + r2.completable = true + r2.tryComplete() TestUtils.waitUntilTrue(() => purgatory.watched() == 3, - "Purgatory should have 3 watched elements instead of " + + purgatory.watched(), 1000L) + "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L) TestUtils.waitUntilTrue(() => purgatory.delayed() == 3, "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L) // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5 - purgatory.checkAndMaybeWatch(r1) - purgatory.checkAndMaybeWatch(r1) + purgatory.tryCompleteElseWatch(r1, Array("test1")) + purgatory.tryCompleteElseWatch(r1, Array("test1")) TestUtils.waitUntilTrue(() => purgatory.watched() == 5, "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L) @@ -103,19 +98,25 @@ class RequestPurgatoryTest extends JUnit3Suite { "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L) } - class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) { - val satisfied = mutable.Set[DelayedRequest]() - val expired = mutable.Set[DelayedRequest]() - def awaitExpiration(delayed: DelayedRequest) = { - delayed synchronized { - delayed.wait() + class MockDelayedRequest(delayMs: Long) extends DelayedRequest(delayMs) { + var completable = false + + def awaitExpiration() { + synchronized { + wait() } } - def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) - def expire(delayed: DelayedRequest) { - expired += delayed - delayed synchronized { - delayed.notify() + + override def tryComplete() = { + if (completable) + forceComplete() + else + false + } + + override def onComplete() { + synchronized { + notify() } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 3804a11..1bfb501 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -106,7 +106,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val newProps = TestUtils.createBrokerConfig(0, port) newProps.setProperty("delete.topic.enable", "true") val newConfig = new KafkaConfig(newProps) - var server = new KafkaServer(newConfig) + val server = new KafkaServer(newConfig) server.startup() server.shutdown() server.awaitShutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/89831204/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 09ed8f5..ccf5e2e 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -17,18 +17,21 @@ package kafka.server import kafka.api._ -import kafka.cluster.{Partition, Replica} -import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.utils._ +import kafka.cluster.Replica +import kafka.common.TopicAndPartition import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} -import kafka.network.RequestChannel -import kafka.utils.{ZkUtils, Time, TestUtils, MockTime} import scala.Some +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import collection.JavaConversions._ import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ class SimpleFetchTest extends JUnit3Suite { @@ -37,215 +40,102 @@ class SimpleFetchTest extends JUnit3Suite { override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) - val topic = "foo" - val partitionId = 0 - /** - * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has - * one partition with one follower replica on broker "1". The leader replica on "0" - * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica - * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync - * but is still in ISR (hasn't yet expired from ISR). - * - * When a normal consumer fetches data, it should only see data up to the HW of the leader, - * in this case up an offset of "5". - */ - def testNonReplicaSeesHwWhenFetching() { - /* setup */ - val time = new MockTime - val leo = 20L - val hw = 5 - val fetchSize = 100 - val messages = new Message("test-message".getBytes()) + // set the replica manager with the partition + val time = new MockTime + val leaderLEO = 20L + val followerLEO = 15L + val partitionHW = 5 - // create nice mock since we don't particularly care about zkclient calls - val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) - EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false) - EasyMock.replay(zkClient) + val fetchSize = 100 + val messagesToHW = new Message("messageToHW".getBytes()) + val messagesToLEO = new Message("messageToLEO".getBytes()) - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log) - EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn( - new FetchDataInfo( - new LogOffsetMetadata(0L, 0L, leo.toInt), - new ByteBufferMessageSet(messages) - )).anyTimes() - EasyMock.replay(log) + val topic = "test-topic" + val partitionId = 0 + val topicAndPartition = TopicAndPartition(topic, partitionId) - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() - EasyMock.replay(logManager) + val fetchInfo = Collections.singletonMap(topicAndPartition, PartitionFetchInfo(0, fetchSize)).toMap - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(0, fetchSize, Some(hw)) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - EasyMock.replay(replicaManager) - - val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5) - - EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() - EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(0, fetchSize, Some(hw)) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - - EasyMock.replay(replicaManager) - - val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) - - val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) - - // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) - // don't provide replica or leader callbacks since they will not be tested here - val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) - - val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) - EasyMock.replay(partitionStateInfo) - // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log - val goodFetch = new FetchRequestBuilder() - .replicaId(Request.OrdinaryConsumerId) - .addFetch(topic, partitionId, 0, fetchSize) - .build() - val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch) - - // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeMs=1)) - - // make sure the log only reads bytes between 0->HW (5) - EasyMock.verify(log) - } + var replicaManager: ReplicaManager = null - /** - * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has - * one partition with one follower replica on broker "1". The leader replica on "0" - * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica - * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync - * but is still in ISR (hasn't yet expired from ISR). - * - * When the follower from broker "1" fetches data, it should see data upto the log end offset ("20") - */ - def testReplicaSeesLeoWhenFetching() { - /* setup */ - val time = new MockTime - val leo = 20 - val hw = 5 - - val messages = new Message("test-message".getBytes()) - - val followerReplicaId = configs(1).brokerId - val followerLEO = 15 + override def setUp() { + super.setUp() + // create nice mock since we don't particularly care about zkclient calls val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) - EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false) EasyMock.replay(zkClient) - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn( + // create nice mock since we don't particularly care about scheduler calls + val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler]) + EasyMock.replay(scheduler) + + // create the log which takes read with either HW max offset or none max offset + val log = EasyMock.createMock(classOf[Log]) + EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() + EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn( new FetchDataInfo( - new LogOffsetMetadata(followerLEO, 0L, followerLEO), - new ByteBufferMessageSet(messages) + new LogOffsetMetadata(0L, 0L, 0), + new ByteBufferMessageSet(messagesToHW) + )).anyTimes() + EasyMock.expect(log.read(0, fetchSize, None)).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, 0), + new ByteBufferMessageSet(messagesToLEO) )).anyTimes() EasyMock.replay(log) + // create the log manager that is aware of this mock log val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.getLog(topicAndPartition)).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - EasyMock.replay(replicaManager) - - val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO) - - EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO))) - EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() - EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes() - EasyMock.replay(replicaManager) - - val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) - - val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) - - val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) - val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) - EasyMock.replay(partitionStateInfo) - - /** - * This fetch, coming from a replica, requests all data at offset "15". Because the request is coming - * from a follower, the leader should oblige and read beyond the HW. - */ - val bigFetch = new FetchRequestBuilder() - .replicaId(followerReplicaId) - .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) - .build() - - val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch) - - // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeMs=1)) - - /** - * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after - * an offset of 15 - */ - EasyMock.verify(log) - } + // create the replica manager + replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, logManager, new AtomicBoolean(false)) + + // add the partition with two replicas, both in ISR + val partition = replicaManager.getOrCreatePartition(topic, partitionId) - private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, - localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { - val partition = new Partition(topic, partitionId, time, replicaManager) - val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) + // create the leader replica with the local log + val leaderReplica = new Replica(configs(0).brokerId, partition, time, 0, Some(log)) + leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW) + partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId) - val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica + // create the follower replica with defined log end offset + val followerReplica= new Replica(configs(1).brokerId, partition, time) + followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) + + // add both of them to ISR + val allReplicas = List(leaderReplica, followerReplica) allReplicas.foreach(partition.addReplicaIfNotExists(_)) - // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet - // set the leader and its hw and the hw update time - partition.leaderReplicaIdOpt = Some(leaderId) - leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW) - partition } - private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { - configs.filter(_.brokerId != leaderId).map { config => - new Replica(config.brokerId, partition, time) - } + override def tearDown() { + replicaManager.shutdown(false) + super.tearDown() } + /** + * The scenario for this test is that there is one topic that has one partition + * with one leader replica on broker "0" and one follower replica on broker "1" + * inside the replica manager's metadata. + * + * The leader replica on "0" has HW of "5" and LEO of "20". The follower on + * broker "1" has a local replica with a HW matching the leader's ("5") and + * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR). + * + * When a fetch operation with read committed data turned on is received, the replica manager + * should only return data up to the HW of the partition; when a fetch operation with read + * committed data turned off is received, the replica manager could return data up to the LEO + * of the local leader replica's log. + */ + def testReadFromLog() { + + assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, + replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) + + assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, + replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) + } }