kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1332413 - in /incubator/kafka/branches/0.8: config/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/message/ core/src/main/scala/kafka/network/ core/src/main/scala/kafka/producer/ core/src/main/...
Date Mon, 30 Apr 2012 21:34:50 GMT
Author: jkreps
Date: Mon Apr 30 21:34:49 2012
New Revision: 1332413

URL: http://svn.apache.org/viewvc?rev=1332413&view=rev
Log:
KAFKA-48 Patch to add "long poll" support to fetch requests. 


Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
Modified:
    incubator/kafka/branches/0.8/config/server.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/server.properties?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/server.properties (original)
+++ incubator/kafka/branches/0.8/config/server.properties Mon Apr 30 21:34:49 2012
@@ -30,9 +30,11 @@ brokerid=0
 # The port the socket server listens on
 port=9092
 
-# The number of processor threads the socket server uses for receiving and answering requests. 
-# Defaults to the number of cores on the machine
-num.threads=8
+# The number of threads handling network requests
+network.threads=2
+ 
+# The number of threads doing disk I/O
+io.threads=2
 
 # The send buffer (SO_SNDBUF) used by the socket server
 socket.send.buffer=1048576

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Mon Apr 30 21:34:49 2012
@@ -76,6 +76,11 @@ case class OffsetDetail(topic: String, p
 
 object FetchRequest {
   val CurrentVersion = 1.shortValue()
+  val DefaultCorrelationId = -1
+  val DefaultClientId = ""
+  val DefaultReplicaId = -1
+  val DefaultMaxWait = 0
+  val DefaultMinBytes = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
@@ -94,13 +99,13 @@ object FetchRequest {
 
 }
 
-case class FetchRequest( versionId: Short,
-                         correlationId: Int,
-                         clientId: String,
-                         replicaId: Int,
-                         maxWait: Int,
-                         minBytes: Int,
-                         offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
+case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
+                        correlationId: Int = FetchRequest.DefaultCorrelationId,
+                        clientId: String = FetchRequest.DefaultClientId,
+                        replicaId: Int = FetchRequest.DefaultReplicaId,
+                        maxWait: Int = FetchRequest.DefaultMaxWait,
+                        minBytes: Int = FetchRequest.DefaultMinBytes,
+                        offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
 
   // ensure that a topic "X" appears in at most one OffsetDetail
   def validate() {
@@ -138,13 +143,14 @@ case class FetchRequest( versionId: Shor
   def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
 }
 
+
 class FetchRequestBuilder() {
-  private var correlationId = -1
+  private var correlationId = FetchRequest.DefaultCorrelationId
   private val versionId = FetchRequest.CurrentVersion
-  private var clientId = ""
-  private var replicaId = -1        // sensible default
-  private var maxWait = -1          // sensible default
-  private var minBytes = -1         // sensible default
+  private var clientId = FetchRequest.DefaultClientId
+  private var replicaId = FetchRequest.DefaultReplicaId
+  private var maxWait = FetchRequest.DefaultMaxWait
+  private var minBytes = FetchRequest.DefaultMinBytes
   private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Apr 30 21:34:49 2012
@@ -42,7 +42,7 @@ case class PartitionData(partition: Int,
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
 
-  def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = {
+  def translatePartition(topic: String, randomSelector: String => Int): Int = {
     if (partition == ProducerRequest.RandomPartition)
       return randomSelector(topic)
     else 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Apr 30 21:34:49 2012
@@ -110,7 +110,7 @@ case class ProducerRequest( versionId: S
     }
   }
 
-  def getNumTopicPartitions = data.foldLeft(0)(_ + _.partitionData.length)
+  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length)
 
   def expectResponse = requiredAcks > 0
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Mon Apr 30 21:34:49 2012
@@ -33,6 +33,8 @@ object ConsumerConfig {
   val MaxRebalanceRetries = 4
   val AutoOffsetReset = OffsetRequest.SmallestTimeString
   val ConsumerTimeoutMs = -1
+  val MinFetchBytes = 1
+  val MaxFetchWaitMs = 3000
   val MirrorTopicsWhitelist = ""
   val MirrorTopicsBlacklist = ""
   val MirrorConsumerNumThreads = 1
@@ -52,7 +54,7 @@ class ConsumerConfig(props: Properties) 
    *  Set this explicitly for only testing purpose. */
   val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
 
-  /** the socket timeout for network requests */
+  /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
   val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
   
   /** the socket receive buffer for network requests */
@@ -61,10 +63,6 @@ class ConsumerConfig(props: Properties) 
   /** the number of byes of messages to attempt to fetch */
   val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
   
-  /** to avoid repeatedly polling a broker node which has no new data
-      we will backoff every time we get an empty set from the broker*/
-  val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs)
-  
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
   val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
   
@@ -76,7 +74,13 @@ class ConsumerConfig(props: Properties) 
 
   /** max number of retries during rebalance */
   val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
-
+  
+  /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
+  val minFetchBytes = Utils.getInt(props, "min.fetch.bytes", MinFetchBytes)
+  
+  /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
+  val maxFetchWaitMs = Utils.getInt(props, "max.fetch.wait.ms", MaxFetchWaitMs)
+  
   /** backoff time between retries during rebalance */
   val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Mon Apr 30 21:34:49 2012
@@ -18,6 +18,7 @@
 package kafka.consumer
 
 import java.io.IOException
+import java.nio.channels.ClosedByInterruptException
 import java.util.concurrent.CountDownLatch
 import kafka.api.{FetchRequestBuilder, OffsetRequest}
 import kafka.cluster.Broker
@@ -33,7 +34,7 @@ class FetcherRunnable(val name: String,
                       val partitionTopicInfos: List[PartitionTopicInfo])
   extends Thread(name) with Logging {
   private val shutdownLatch = new CountDownLatch(1)
-  private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs,
+  private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.maxFetchWaitMs + config.socketTimeoutMs,
     config.socketBufferSize)
   @volatile
   private var stopped = false
@@ -54,19 +55,19 @@ class FetcherRunnable(val name: String,
     var reqId = 0
     try {
       while (!stopped) {
-        // TODO: fix up the max wait and min bytes
         val builder = new FetchRequestBuilder().
           correlationId(reqId).
           clientId(config.consumerId.getOrElse(name)).
-          maxWait(0).
-          minBytes(0)
+          maxWait(config.maxFetchWaitMs).
+          minBytes(config.minFetchBytes)
         partitionTopicInfos.foreach(pti =>
           builder.addFetch(pti.topic, pti.partitionId, pti.getFetchOffset(), config.fetchSize)
         )
 
         val fetchRequest = builder.build()
-        trace("fetch request: " + fetchRequest)
+        val start = System.currentTimeMillis
         val response = simpleConsumer.fetch(fetchRequest)
+        trace("Fetch completed in " + (System.currentTimeMillis - start) + " ms with max wait of " + config.maxFetchWaitMs)
 
         var read = 0L
         for(infopti <- partitionTopicInfos) {
@@ -74,7 +75,7 @@ class FetcherRunnable(val name: String,
           try {
             var done = false
             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
-              info("offset for " + infopti + " out of range")
+              info("Offset for " + infopti + " out of range")
               // see if we can fix this error
               val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partitionId)
               if(resetOffset >= 0) {
@@ -86,36 +87,35 @@ class FetcherRunnable(val name: String,
             if (!done)
               read += infopti.enqueue(messages, infopti.getFetchOffset)
           } catch {
-            case e1: IOException =>
+            case e: IOException =>
               // something is wrong with the socket, re-throw the exception to stop the fetcher
-              throw e1
-            case e2 =>
+              throw e
+            case e =>
               if (!stopped) {
                 // this is likely a repeatable error, log it and trigger an exception in the consumer
-                error("error in FetcherRunnable for " + infopti, e2)
-                infopti.enqueueError(e2, infopti.getFetchOffset)
+                error("Error in fetcher for " + infopti, e)
+                infopti.enqueueError(e, infopti.getFetchOffset)
               }
               // re-throw the exception to stop the fetcher
-              throw e2
+              throw e
           }
         }
         reqId = if(reqId == Int.MaxValue) 0 else reqId + 1
 
         trace("fetched bytes: " + read)
-        if(read == 0) {
-          debug("backing off " + config.fetcherBackoffMs + " ms")
-          Thread.sleep(config.fetcherBackoffMs)
-        }
       }
     } catch {
+      case e: ClosedByInterruptException => 
+        // we interrupted ourselves, close quietly
+        debug("Fetch request interrupted, exiting...")
       case e =>
-        if (stopped)
-          info("FecherRunnable " + this + " interrupted")
+        if(stopped)
+          info("Fetcher stopped...")
         else
-          error("error in FetcherRunnable ", e)
+          error("Error in fetcher ", e)
     }
 
-    info("stopping fetcher " + name + " to host " + broker.host)
+    info("Stopping fetcher " + name + " to host " + broker.host)
     Utils.swallow(logger.info, simpleConsumer.close)
     shutdownComplete()
   }
@@ -139,7 +139,7 @@ class FetcherRunnable(val name: String,
     val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
     // reset manually in zookeeper
-    info("updating partition " + partitionId + " for topic " + topic + " with " +
+    info("Updating partition " + partitionId + " for topic " + topic + " with " +
             (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
     ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partitionId, offsets(0).toString)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Apr 30 21:34:49 2012
@@ -66,7 +66,7 @@ class SimpleConsumer( val host: String,
         response = blockingChannel.receive()
       } catch {
         case e : java.io.IOException =>
-          info("Reconnect in due to socket error: ", e)
+          info("Reconnect due to socket error: ", e)
           // retry once
           try {
             reconnect()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Mon Apr 30 21:34:49 2012
@@ -92,7 +92,7 @@ class FileMessageSet private[kafka](priv
   /**
    * Return a message set which is a view into this set starting from the given offset and with the given size limit.
    */
-  def read(readOffset: Long, size: Long): MessageSet = {
+  def read(readOffset: Long, size: Long): FileMessageSet = {
     new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
       false, new AtomicBoolean(false))
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Mon Apr 30 21:34:49 2012
@@ -22,7 +22,10 @@ import java.util.concurrent._
 object RequestChannel { 
   val AllDone = new Request(1, 2, null, 0)
   case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
-  case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long)
+  case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long) {
+    def this(request: Request, send: Send, ellapsed: Long) = 
+      this(request.processor, request.requestKey, send, request.start, ellapsed)
+  }
 }
 
 class RequestChannel(val numProcessors: Int, val queueSize: Int) { 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Mon Apr 30 21:34:49 2012
@@ -238,7 +238,7 @@ private[kafka] class Processor(val id: I
   private def processNewResponses() {
     var curr = requestChannel.receiveResponse(id)
     while(curr != null) {
-      trace("Socket server received response to send: " + curr)
+      trace("Socket server received response to send, registering for write: " + curr)
       val key = curr.requestKey.asInstanceOf[SelectionKey]
       try {
         key.interestOps(SelectionKey.OP_WRITE)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Mon Apr 30 21:34:49 2012
@@ -65,7 +65,7 @@ class KafkaLog4jAppender extends Appende
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(serializerClass == null) {
       serializerClass = "kafka.serializer.StringEncoder"
-      LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
+      LogLog.debug("Using default encoder - kafka.serializer.StringEncoder")
     }
     props.put("serializer.class", serializerClass)
     val config : ProducerConfig = new ProducerConfig(props)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Mon Apr 30 21:34:49 2012
@@ -65,7 +65,7 @@ class DefaultEventHandler[K,V](config: P
     val partitionedData = partitionAndCollate(messages)
     val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
     try {
-      for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) {
+      for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
         if (logger.isTraceEnabled)
           eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
             .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
@@ -163,7 +163,7 @@ class DefaultEventHandler[K,V](config: P
         throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId))
       if(messagesPerTopic.size > 0) {
         val topics = new HashMap[String, ListBuffer[PartitionData]]()
-        for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
+        for(((topicName, partitionId), messagesSet) <- messagesPerTopic) {
           topics.get(topicName) match {
             case Some(x) => trace("found " + topicName)
             case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Apr 30 21:34:49 2012
@@ -19,55 +19,80 @@ package kafka.server
 
 import java.io.IOException
 import java.lang.IllegalStateException
+import java.util.concurrent.atomic._
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.log._
 import kafka.message._
 import kafka.network._
 import org.apache.log4j.Logger
-import scala.collection.mutable.ListBuffer
+import scala.collection._
 import kafka.utils.{SystemTime, Logging}
-import kafka.common.{FetchRequestFormatException, ErrorMapping}
+import kafka.common._
+import scala.math._
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
   
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
-  def handle(receive: Receive): Option[Send] = { 
-    val apiId = receive.buffer.getShort() 
+  /**
+   * Top-level method that handles all requests and multiplexes to the right api
+   */
+  def handle(request: RequestChannel.Request) { 
+    val apiId = request.request.buffer.getShort() 
     apiId match {
-      case RequestKeys.Produce => handleProducerRequest(receive)
-      case RequestKeys.Fetch => handleFetchRequest(receive)
-      case RequestKeys.Offsets => handleOffsetRequest(receive)
-      case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
+      case RequestKeys.Produce => handleProducerRequest(request)
+      case RequestKeys.Fetch => handleFetchRequest(request)
+      case RequestKeys.Offsets => handleOffsetRequest(request)
+      case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
       case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
     }
   }
 
-  def handleProducerRequest(receive: Receive): Option[Send] = {
+  /**
+   * Handle a produce request
+   */
+  def handleProducerRequest(request: RequestChannel.Request) {
+    val produceRequest = ProducerRequest.readFrom(request.request.buffer)
     val sTime = SystemTime.milliseconds
-    val request = ProducerRequest.readFrom(receive.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Producer request " + request.toString)
 
-    val response = handleProducerRequest(request)
+    val response = produce(produceRequest)
     debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
-    Some(new ProducerResponseSend(response))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1))
+    
+    // Now check any outstanding fetches this produce just unblocked
+    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
+    for(topicData <- produceRequest.data) {
+      for(partition <- topicData.partitionData)
+        satisfied ++= fetchRequestPurgatory.update((topicData.topic, partition.partition), topicData)
+    }
+    // send any newly unblocked responses
+    for(fetchReq <- satisfied) {
+       val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
+       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+       requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+    }
   }
 
-  private def handleProducerRequest(request: ProducerRequest): ProducerResponse = {
-    val requestSize = request.getNumTopicPartitions
+  /**
+   * Helper method for handling a parsed producer request
+   */
+  private def produce(request: ProducerRequest): ProducerResponse = {
+    val requestSize = request.topicPartitionCount
     val errors = new Array[Short](requestSize)
     val offsets = new Array[Long](requestSize)
 
     var msgIndex = -1
-    for( topicData <- request.data ) {
-      for( partitionData <- topicData.partitionData ) {
+    for(topicData <- request.data) {
+      for(partitionData <- topicData.partitionData) {
         msgIndex += 1
-        val partition = partitionData.getTranslatedPartition(topicData.topic, logManager.chooseRandomPartition)
+        val partition = partitionData.translatePartition(topicData.topic, logManager.chooseRandomPartition)
         try {
           // TODO: need to handle ack's here!  Will probably move to another method.
           kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition)
@@ -82,7 +107,7 @@ class KafkaApis(val logManager: LogManag
             e match {
               case _: IOException =>
                 fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
-                Runtime.getRuntime.halt(1)
+                System.exit(1)
               case _ =>
                 errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort
                 offsets(msgIndex) = -1
@@ -93,8 +118,11 @@ class KafkaApis(val logManager: LogManag
     new ProducerResponse(ProducerResponse.CurrentVersion, request.correlationId, errors, offsets)
   }
 
-  def handleFetchRequest(request: Receive): Option[Send] = {
-    val fetchRequest = FetchRequest.readFrom(request.buffer)
+  /**
+   * Handle a fetch request
+   */
+  def handleFetchRequest(request: RequestChannel.Request) {
+    val fetchRequest = FetchRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Fetch request " + fetchRequest.toString)
 
@@ -104,13 +132,54 @@ class KafkaApis(val logManager: LogManag
     } catch {
       case e:FetchRequestFormatException =>
         val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
-        return Some(new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode))
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode), -1)
+        requestChannel.sendResponse(channelResponse)
     }
-
-    val fetchedData = new ListBuffer[TopicData]()
-
+    
+    // if there are enough bytes available right now we can answer the request, otherwise we have to punt
+    val availableBytes = availableFetchBytes(fetchRequest)
+    if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
+      val topicData = readMessageSets(fetchRequest.offsetInfo)
+      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+    } else {
+      // create a list of (topic, partition) pairs to use as keys for this delayed request
+      val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
+      val delayedFetch = new DelayedFetch(keys, request, fetchRequest, fetchRequest.maxWait, availableBytes)
+      fetchRequestPurgatory.watch(delayedFetch)
+    }
+  }
+    
+  /**
+   * Calculate the number of available bytes for the given fetch request
+   */
+  private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
+    var totalBytes = 0L
     for(offsetDetail <- fetchRequest.offsetInfo) {
-      val info = new ListBuffer[PartitionData]()
+      for(i <- 0 until offsetDetail.partitions.size) {
+        try {
+          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) 
+          val available = maybeLog match {
+            case Some(log) => max(0, log.getHighwaterMark - offsetDetail.offsets(i))
+            case None => 0
+          }
+    	  totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+        } catch {
+          case e: InvalidPartitionException => 
+            info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
+        }
+      }
+    }
+    totalBytes
+  }
+  
+  /**
+   * Read from all the offset details given and produce an array of topic datas
+   */
+  private def readMessageSets(offsets: Seq[OffsetDetail]): Array[TopicData] = {
+    val fetchedData = new mutable.ArrayBuffer[TopicData]()
+    for(offsetDetail <- offsets) {
+      val info = new mutable.ArrayBuffer[PartitionData]()
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
@@ -122,10 +191,12 @@ class KafkaApis(val logManager: LogManag
       }
       fetchedData.append(new TopicData(topic, info.toArray))
     }
-    val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, fetchedData.toArray )
-    Some(new FetchResponseSend(response, ErrorMapping.NoError))
+    fetchedData.toArray
   }
-
+  
+  /**
+   * Read from a single topic/partition at the given offset
+   */
   private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
     var response: Either[Int, MessageSet] = null
     try {
@@ -140,22 +211,28 @@ class KafkaApis(val logManager: LogManag
     response
   }
 
-  def handleOffsetRequest(request: Receive): Option[Send] = {
-    val offsetRequest = OffsetRequest.readFrom(request.buffer)
+  /**
+   * Service the offset request API 
+   */
+  def handleOffsetRequest(request: RequestChannel.Request) {
+    val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Offset request " + offsetRequest.toString)
     val offsets = logManager.getOffsets(offsetRequest)
     val response = new OffsetArraySend(offsets)
-    Some(response)
+    requestChannel.sendResponse(new RequestChannel.Response(request, response, -1))
   }
 
-  def handleTopicMetadataRequest(request: Receive): Option[Send] = {
-    val metadataRequest = TopicMetadataRequest.readFrom(request.buffer)
+  /**
+   * Service the topic metadata request API
+   */
+  def handleTopicMetadataRequest(request: RequestChannel.Request) {
+    val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
 
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Topic metadata request " + metadataRequest.toString())
 
-    val topicsMetadata = new ListBuffer[TopicMetadata]()
+    val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = logManager.getServerConfig
     val zkClient = kafkaZookeeper.getZookeeperClient
     val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
@@ -181,6 +258,37 @@ class KafkaApis(val logManager: LogManag
       }
     }
     info("Sending response for topic metadata request")
-    Some(new TopicMetadataSend(topicsMetadata))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
+  }
+  
+  /**
+   * A delayed fetch request
+   */
+  class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) {
+    val bytesAccumulated = new AtomicLong(initialSize)
+   }
+
+  /**
+   * A holding pen for fetch requests waiting to be satisfied
+   */
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, TopicData] {
+    
+    /**
+     * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
+     */
+    def checkSatisfied(topicData: TopicData, delayedFetch: DelayedFetch): Boolean = {
+      val messageDataSize = topicData.partitionData.map(_.messages.sizeInBytes).sum
+      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
+      accumulatedSize >= delayedFetch.fetch.minBytes
+    }
+    
+    /**
+     * When a request expires just answer it with whatever data is present
+     */
+    def expire(delayed: DelayedFetch) {
+      val topicData = readMessageSets(delayed.fetch.offsetInfo)
+      val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
+    }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Mon Apr 30 21:34:49 2012
@@ -17,52 +17,38 @@
 
 package kafka.server
 
+import org.apache.log4j._
 import kafka.network._
 import kafka.utils._
 
 /**
- * Thread that answers kafka requests.
+ * A thread that answers kafka requests.
  */
-class KafkaRequestHandler(val requestChannel: RequestChannel, val handle: (Receive) => Option[Send]) extends Runnable with Logging { 
-  
+class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { 
+     
   def run() { 
     while(true) { 
       val req = requestChannel.receiveRequest()
       trace("Processor " + Thread.currentThread.getName + " got request " + req)
       if(req == RequestChannel.AllDone)
         return
-      handle(req.request) match { 
-        case Some(send) => { 
-          val resp = new RequestChannel.Response(processor = req.processor, 
-                                                 requestKey = req.requestKey, 
-                                                 response = send,
-                                                 start = req.start,
-                                                 elapsed = -1)
-          requestChannel.sendResponse(resp)
-          trace("Processor " + Thread.currentThread.getName + " sent response " + resp)
-        }
-        case None =>
-      }
+      apis.handle(req)
     }
   }
 
-  def shutdown() {
-    requestChannel.sendRequest(RequestChannel.AllDone)
-  }
+  def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
   
 }
 
-/**
- * Pool of request handling threads.
- */
-class KafkaRequestHandlerPool(val requestChannel: RequestChannel, val handler: (Receive) => Option[Send], numThreads: Int) { 
+class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
+                              val apis: KafkaApis, 
+                              numThreads: Int) { 
   
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) { 
-    runnables(i) = new KafkaRequestHandler(requestChannel, handler)
-    threads(i) = new Thread(runnables(i), "kafka-request-handler-" + i)
-    threads(i).setDaemon(true)
+    runnables(i) = new KafkaRequestHandler(requestChannel, apis)
+    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Mon Apr 30 21:34:49 2012
@@ -70,8 +70,8 @@ class KafkaServer(val config: KafkaConfi
 
     kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
 
-    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel,
-      new KafkaApis(logManager, kafkaZookeeper).handle, config.numIoThreads)
+    val apis = new KafkaApis(socketServer.requestChannel, logManager, kafkaZookeeper)
+    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
     socketServer.startup
 
     Mx4jLoader.maybeLoad

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1332413&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Mon Apr 30 21:34:49 2012
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.collection._
+import java.util.LinkedList
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import kafka.api._
+import kafka.network._
+import kafka.utils._
+
+/**
+ * A request whose processing needs to be delayed for at most the given delayMs
+ * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied,
+ * for example a key could be a (topic, partition) pair.
+ */
+class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
+  val satisfied = new AtomicBoolean(false)
+}
+
+/**
+ * A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay
+ * and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given
+ * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
+ * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
+ * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
+ * 
+ * For us the key is generally a (topic, partition) pair.
+ * By calling 
+ *   watch(delayedRequest) 
+ * we will add triggers for each of the given keys. It is up to the user to then call
+ *   val satisfied = update(key, request) 
+ * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
+ * new request.
+ * 
+ * An implementation provides extends two helper functions
+ *   def checkSatisfied(request: R, delayed: T): Boolean
+ * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
+ * request delayed. This method will likely also need to do whatever bookkeeping is necessary.
+ * 
+ * The second function is
+ *   def expire(delayed: T)
+ * this function handles delayed requests that have hit their time limit without being satisfied.
+ * 
+ */
+abstract class RequestPurgatory[T <: DelayedRequest, R] {
+  
+  /* a list of requests watching each key */
+  private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
+  
+  /* background thread expiring requests that have been waiting too long */
+  private val expiredRequestReaper = new ExpiredRequestReaper
+  private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
+  expirationThread.start()
+  
+  /**
+   * Add a new delayed request watching the contained keys
+   */
+  def watch(delayedRequest: T) {
+    for(key <- delayedRequest.keys) {
+      var lst = watchersFor(key)
+      lst.add(delayedRequest)
+    }
+    expiredRequestReaper.enqueue(delayedRequest)
+  }
+  
+  /**
+   * Update any watchers and return a list of newly satisfied requests.
+   */
+  def update(key: Any, request: R): Seq[T] = {
+    val w = watchersForKey.get(key)
+    if(w == null)
+      Seq.empty
+    else
+      w.collectSatisfiedRequests(request)
+  }
+  
+  private def watchersFor(key: Any): Watchers = {
+    var lst = watchersForKey.get(key)
+    if(lst == null) {
+      watchersForKey.putIfAbsent(key, new Watchers)
+      lst = watchersForKey.get(key)
+    }
+    lst
+  }
+  
+  /**
+   * Check if this request satisfied this delayed request
+   */
+  protected def checkSatisfied(request: R, delayed: T): Boolean
+  
+  /**
+   * Handle an expired delayed request
+   */
+  protected def expire(delayed: T)
+  
+  /**
+   * Shutdown the expirey thread
+   */
+  def shutdown() {
+    expiredRequestReaper.shutdown()
+  }
+  
+  /**
+   * A linked list of DelayedRequests watching some key with some associated bookeeping logic
+   */
+  private class Watchers {
+    
+    /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
+    private val CleanupThresholdSize = 100
+    private val CleanupThresholdPrct = 0.5
+    
+    private val requests = new LinkedList[T]
+    
+    /* you can only change this if you have added something or marked something satisfied */
+    var liveCount = 0.0
+  
+    def add(t: T) {
+      synchronized {
+        requests.add(t)
+        liveCount += 1
+        maybePurge()
+      }
+    }
+    
+    private def maybePurge() {
+      if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
+        val iter = requests.iterator()
+        while(iter.hasNext) {
+          val curr = iter.next
+          if(curr.satisfied.get())
+            iter.remove()
+        }
+      }
+    }
+  
+    def decLiveCount() {
+      synchronized {
+        liveCount -= 1
+      }
+    }
+    
+    def collectSatisfiedRequests(request: R): Seq[T] = {
+      val response = new mutable.ArrayBuffer[T]
+      synchronized {
+        val iter = requests.iterator()
+        while(iter.hasNext) {
+          val curr = iter.next
+          if(curr.satisfied.get) {
+            // another thread has satisfied this request, remove it
+            iter.remove()
+          } else {
+            if(checkSatisfied(request, curr)) {
+              iter.remove()
+              val updated = curr.satisfied.compareAndSet(false, true)
+              if(updated == true) {
+                response += curr
+                liveCount -= 1
+                expiredRequestReaper.satisfyRequest()
+              }
+            }
+          }
+        }
+      }
+      response
+    }
+  }
+  
+  /**
+   * Runnable to expire requests that have sat unfullfilled past their deadline
+   */
+  private class ExpiredRequestReaper extends Runnable with Logging {
+    
+    /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
+    private val CleanupThresholdSize = 100
+    private val CleanupThresholdPrct = 0.5
+    
+    private val delayed = new DelayQueue[T]
+    private val running = new AtomicBoolean(true)
+    private val shutdownLatch = new CountDownLatch(1)
+    private val needsPurge = new AtomicBoolean(false)
+    /* The count of elements in the delay queue that are unsatisfied */
+    private val unsatisfied = new AtomicInteger(0)
+    
+    /** Main loop for the expiry thread */
+    def run() {
+      while(running.get) {
+        try {
+          val curr = pollExpired()
+          expire(curr)
+        } catch {
+          case ie: InterruptedException => 
+            if(needsPurge.getAndSet(false)) {
+              val purged = purgeSatisfied()
+              debug("Forced purge of " + purged + " requests from delay queue.")
+            }
+          case e: Exception => 
+            error("Error in long poll expiry thread: ", e)
+        }
+      }
+      shutdownLatch.countDown()
+    }
+    
+    /** Add a request to be expired */
+    def enqueue(t: T) {
+      delayed.add(t)
+      unsatisfied.incrementAndGet()
+      if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
+        forcePurge()
+    }
+    
+    private def forcePurge() {
+      needsPurge.set(true)
+      expirationThread.interrupt()
+    }
+    
+    /** Shutdown the expiry thread*/
+    def shutdown() {
+      debug("Shutting down request expiry thread")
+      running.set(false)
+      expirationThread.interrupt()
+      shutdownLatch.await()
+    }
+    
+    /** Record the fact that we satisfied a request in the stats for the expiry queue */
+    def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
+    
+    /**
+     * Get the next expired event
+     */
+    private def pollExpired(): T = {
+      while(true) {
+        val curr = delayed.take()
+        val updated = curr.satisfied.compareAndSet(false, true)
+        if(updated) {
+          unsatisfied.getAndDecrement()
+          for(key <- curr.keys)
+            watchersFor(key).decLiveCount()
+          return curr
+        }
+      }
+      throw new RuntimeException("This should not happen")
+    }
+  
+    /**
+     * Delete all expired events from the delay queue
+     */
+    private def purgeSatisfied(): Int = {
+      var purged = 0
+      val iter = delayed.iterator()
+      while(iter.hasNext) {
+        val curr = iter.next()
+        if(curr.satisfied.get) {
+          iter.remove()
+          purged += 1
+        }
+      }
+      purged
+    }
+  }
+  
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala Mon Apr 30 21:34:49 2012
@@ -39,7 +39,7 @@ class DelayedItem[T](val item: T, delay:
   def compareTo(d: Delayed): Int = {
     val delayed = d.asInstanceOf[DelayedItem[T]]
     val myEnd = createdMs + delayMs
-    val yourEnd = delayed.createdMs - delayed.delayMs
+    val yourEnd = delayed.createdMs + delayed.delayMs
     
     if(myEnd < yourEnd) -1
     else if(myEnd > yourEnd) 1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Apr 30 21:34:49 2012
@@ -23,7 +23,6 @@ import java.lang.IllegalStateException
 
 /**
  * A scheduler for running jobs in the background
- * TODO: ScheduledThreadPoolExecutor notriously swallows exceptions
  */
 class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
   private val threadId = new AtomicLong(0)
@@ -32,11 +31,7 @@ class KafkaScheduler(val numThreads: Int
 
   def startUp = {
     executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
-      def newThread(runnable: Runnable): Thread = {
-        val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
-        t.setDaemon(isDaemon)
-        t
-      }
+      def newThread(runnable: Runnable): Thread = Utils.daemonThread(baseThreadName + threadId.getAndIncrement, runnable)
     })
     executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
     executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
@@ -57,12 +52,12 @@ class KafkaScheduler(val numThreads: Int
   def shutdownNow() {
     checkIfExecutorHasStarted
     executor.shutdownNow()
-    info("force shutdown scheduler " + baseThreadName)
+    info("Forcing shutdown of scheduler " + baseThreadName)
   }
 
   def shutdown() {
     checkIfExecutorHasStarted
     executor.shutdown()
-    info("shutdown scheduler " + baseThreadName)
+    info("Shutdown scheduler " + baseThreadName)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Apr 30 21:34:49 2012
@@ -97,6 +97,11 @@ object Utils extends Logging {
   def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = {
     val thread = new Thread(runnable, name) 
     thread.setDaemon(daemon)
+    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+      def uncaughtException(t: Thread, e: Throwable) {
+        error("Uncaught exception in thread '" + t.getName + "':", e)
+      } 
+    })
     thread
   }
    

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Mon Apr 30 21:34:49 2012
@@ -18,7 +18,7 @@ log4j.appender.stdout=org.apache.log4j.C
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=ERROR
+log4j.logger.kafka=INFO
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
 log4j.logger.org.I0Itec.zkclient.ZkClient=WARN

Added: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1332413&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala Mon Apr 30 21:34:49 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import java.util.Properties
+import kafka.consumer._
+import kafka.producer._
+import kafka.message._
+
+object TestEndToEndLatency {
+  def main(args: Array[String]) {
+    if(args.length != 2) {
+      System.err.println("USAGE: java " + getClass().getName + " zookeeper_connect num_messages")
+      System.exit(1)
+    }
+    
+    val zkConnect = args(0)
+    val numMessages = args(1).toInt
+    val topic = "test"
+    
+    val consumerProps = new Properties()
+    consumerProps.put("groupid", topic)
+    consumerProps.put("auto.commit", "true")
+    consumerProps.put("autooffset.reset", "largest")
+    consumerProps.put("zk.connect", zkConnect)
+    consumerProps.put("socket.timeout.ms", 1201000.toString)
+    
+    val config = new ConsumerConfig(consumerProps)
+    val connector = Consumer.create(config)
+    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
+    val iter = stream.iterator
+
+    val producerProps = new Properties()
+    producerProps.put("zk.connect", zkConnect)
+    producerProps.put("producer.type", "sync")
+    val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
+    
+    val message = new Message("hello there beautiful".getBytes)
+    var totalTime = 0.0
+    var totalSize = 0L
+    for(i <- 0 until numMessages) {
+      var begin = System.nanoTime
+      producer.send(new ProducerData(topic, message))
+      val received = iter.next
+      val ellapsed = System.nanoTime - begin
+      // poor man's progress bar
+      if(i % 10000 == 0)
+        println(i + "\t" + ellapsed / 1000.0 / 1000.0)
+      totalTime += ellapsed
+      totalSize += received.size
+    }
+    println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms"
+    producer.close()
+    connector.shutdown()
+    System.exit(0)
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Mon Apr 30 21:34:49 2012
@@ -35,6 +35,7 @@ import kafka.utils.TestUtils._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
+  val RebalanceBackoffMs = 5000
   var dirs : ZKGroupTopicDirs = null
   val zookeeperConnect = TestZKUtils.zookeeperConnect
   val numNodes = 2
@@ -63,7 +64,7 @@ class ZookeeperConsumerConnectorTest ext
   }
 
   def testBasic() {
-    val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
+    val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // test consumer timeout logic
@@ -117,8 +118,9 @@ class ZookeeperConsumerConnectorTest ext
     zkConsumerConnector1.commitOffsets
 
     // create a consumer
-    val consumerConfig2 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+    val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
+      override val rebalanceBackoffMs = RebalanceBackoffMs
+    }
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
     // send some messages to each broker
@@ -202,8 +204,9 @@ class ZookeeperConsumerConnectorTest ext
     zkConsumerConnector1.commitOffsets
 
     // create a consumer
-    val consumerConfig2 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+    val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
+      override val rebalanceBackoffMs = RebalanceBackoffMs
+    }
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
     // send some messages to each broker
@@ -300,7 +303,7 @@ class ZookeeperConsumerConnectorTest ext
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_2 = List( ("0", "group1_consumer0-0"),
                            ("1", "group1_consumer0-0"))
-   assertEquals(expected_2, actual_2)
+    assertEquals(expected_2, actual_2)
 
     zkConsumerConnector1.shutdown
 
@@ -372,7 +375,7 @@ class ZookeeperConsumerConnectorTest ext
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_1 = List( ("0", "group1_consumer1-0"))
-   assertEquals(expected_1, actual_1)
+    assertEquals(expected_1, actual_1)
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
     assertEquals(nMessages, receivedMessages1.size)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Mon Apr 30 21:34:49 2012
@@ -34,11 +34,11 @@ class AutoOffsetResetTest extends JUnit3
   val topic = "test_topic"
   val group = "default_group"
   val testConsumer = "consumer"
-  val brokerPort = 9892
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, brokerPort)))
-  val numMessages = 10
-  val largeOffset = 10000
-  val smallOffset = -1
+  val BrokerPort = 9892
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
+  val NumMessages = 10
+  val LargeOffset = 10000
+  val SmallOffset = -1
   
   val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
 
@@ -55,165 +55,55 @@ class AutoOffsetResetTest extends JUnit3
     super.tearDown
   }
   
-  def testEarliestOffsetResetForward() = {
-    val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
-
-    for(i <- 0 until numMessages) {
-      producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
-    }
-
-    // update offset in zookeeper for consumer to jump "forward" in time
-    val dirs = new ZKGroupTopicDirs(group, topic)
-    var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
-    consumerProps.put("autooffset.reset", "smallest")
-    consumerProps.put("consumer.timeout.ms", "2000")
-    val consumerConfig = new ConsumerConfig(consumerProps)
+  def testResetToEarliestWhenOffsetTooHigh() = 
+    assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset))
+  
+  def testResetToEarliestWhenOffsetTooLow() =
+    assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
     
-    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", largeOffset)
-    info("Updated consumer offset to " + largeOffset)
-
-    Thread.sleep(500)
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-    val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
-
-    var threadList = List[Thread]()
-    val nMessages : AtomicInteger = new AtomicInteger(0)
-    for ((topic, streamList) <- messageStreams)
-      for (i <- 0 until streamList.length)
-        threadList ::= new Thread("kafka-zk-consumer-" + i) {
-          override def run() {
-
-            try {
-              for (message <- streamList(i)) {
-                nMessages.incrementAndGet
-              }
-            }
-            catch {
-              case te: ConsumerTimeoutException => info("Consumer thread timing out..")
-              case _: InterruptedException => 
-              case _: ClosedByInterruptException =>
-              case e => throw e
-            }
-          }
-
-        }
-
-
-    for (thread <- threadList)
-      thread.start
-
-    threadList(0).join(2000)
-
-    info("Asserting...")
-    assertEquals(numMessages, nMessages.get)
-    consumerConnector.shutdown
-  }
-
-  def testEarliestOffsetResetBackward() = {
+  def testResetToLatestWhenOffsetTooHigh() = 
+    assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
+    
+  def testResetToLatestWhenOffsetTooLow() = 
+    assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
+  
+  /* Produce the given number of messages, create a consumer with the given offset policy, 
+   * then reset the offset to the given value and consume until we get no new messages. 
+   * Returns the count of messages received.
+   */
+  def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
     val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
 
-    for(i <- 0 until numMessages) {
+    for(i <- 0 until numMessages)
       producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
-    }
 
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
     var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
-    consumerProps.put("autooffset.reset", "smallest")
+    consumerProps.put("autooffset.reset", resetTo)
     consumerProps.put("consumer.timeout.ms", "2000")
+    consumerProps.put("max.fetch.wait.ms", "0")
     val consumerConfig = new ConsumerConfig(consumerProps)
 
-    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", smallOffset)
-    info("Updated consumer offset to " + smallOffset)
-
-
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", offset)
+    info("Updated consumer offset to " + offset)
+    
     val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-    val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+    val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
 
-    var threadList = List[Thread]()
-    val nMessages : AtomicInteger = new AtomicInteger(0)
-    for ((topic, streamList) <- messageStreams)
-      for (i <- 0 until streamList.length)
-        threadList ::= new Thread("kafka-zk-consumer-" + i) {
-          override def run() {
-
-            try {
-              for (message <- streamList(i)) {
-                nMessages.incrementAndGet
-              }
-            }
-            catch {
-              case _: InterruptedException =>
-              case _: ClosedByInterruptException =>
-              case e => throw e
-            }
-          }
-
-        }
-
-
-    for (thread <- threadList)
-      thread.start
-
-    threadList(0).join(2000)
-
-    assertEquals(numMessages, nMessages.get)
-    consumerConnector.shutdown
-  }
-
-  def testLatestOffsetResetForward() = {
-    val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
-
-    for(i <- 0 until numMessages) {
-      producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
+    var received = 0
+    val iter = messageStream.iterator
+    try {
+      for (i <- 0 until numMessages) {
+        iter.next // will throw a timeout exception if the message isn't there
+        received += 1
+      }
+    } catch {
+      case e: ConsumerTimeoutException => 
+        info("consumer timed out after receiving " + received + " messages.")
     }
-
-    // update offset in zookeeper for consumer to jump "forward" in time
-    val dirs = new ZKGroupTopicDirs(group, topic)
-    var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
-    consumerProps.put("autooffset.reset", "largest")
-    consumerProps.put("consumer.timeout.ms", "2000")
-    val consumerConfig = new ConsumerConfig(consumerProps)
-
-    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", largeOffset)
-    info("Updated consumer offset to " + largeOffset)
-
-
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-    val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
-
-    var threadList = List[Thread]()
-    val nMessages : AtomicInteger = new AtomicInteger(0)
-    for ((topic, streamList) <- messageStreams)
-      for (i <- 0 until streamList.length)
-        threadList ::= new Thread("kafka-zk-consumer-" + i) {
-          override def run() {
-
-            try {
-              for (message <- streamList(i)) {
-                nMessages.incrementAndGet
-              }
-            }
-            catch {
-              case _: InterruptedException =>
-              case _: ClosedByInterruptException =>
-              case e => throw e
-            }
-          }
-
-        }
-
-
-    for (thread <- threadList)
-      thread.start
-
-    threadList(0).join(2000)
-
-    info("Asserting...")
-
-    assertEquals(0, nMessages.get)
     consumerConnector.shutdown
+    received
   }
-
   
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Apr 30 21:34:49 2012
@@ -82,8 +82,7 @@ class PrimitiveApiTest extends JUnit3Sui
       new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
       new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000))
     )
-    val request = new FetchRequest( versionId = FetchRequest.CurrentVersion, correlationId = 0, clientId = "",
-                                    replicaId = -1, maxWait = -1, minBytes = -1, offsetInfo = offsets)
+    val request = new FetchRequest(offsetInfo = offsets)
     try {
       consumer.fetch(request)
       fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics")
@@ -172,7 +171,7 @@ class PrimitiveApiTest extends JUnit3Sui
     {
       // send some invalid offsets
       val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics)
+      for((topic, partition) <- topics)
         builder.addFetch(topic, partition, -1, 10000)
 
       try {
@@ -189,7 +188,7 @@ class PrimitiveApiTest extends JUnit3Sui
     {
       // send some invalid partitions
       val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics)
+      for((topic, partition) <- topics)
         builder.addFetch(topic, -1, 0, 10000)
 
       try {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Mon Apr 30 21:34:49 2012
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer
 import kafka.log.LogManager
 import junit.framework.Assert._
 import org.easymock.EasyMock
-import kafka.network.BoundedByteBufferReceive
+import kafka.network._
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
 import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
@@ -93,7 +93,8 @@ class TopicMetadataTest extends JUnit3Su
     serializedMetadataRequest.rewind()
 
     // create the kafka request handler
-    val kafkaRequestHandler = new KafkaApis(logManager, kafkaZookeeper)
+    val requestChannel = new RequestChannel(2, 5)
+    val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
@@ -101,23 +102,18 @@ class TopicMetadataTest extends JUnit3Su
     EasyMock.replay(receivedRequest)
 
     // call the API (to be tested) to get metadata
-    val metadataResponse = kafkaRequestHandler.handleTopicMetadataRequest(receivedRequest)
-
-    // verify the topic metadata returned
-    metadataResponse match {
-      case Some(metadata) =>
-        val responseBuffer = metadata.asInstanceOf[TopicMetadataSend].metadata
-        val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(responseBuffer)
-        assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-        assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-        val partitionMetadata = topicMetadata.head.partitionsMetadata
-        assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-        assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-        assertEquals(brokers, partitionMetadata.head.replicas)
-        assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
-      case None =>
-        fail("Metadata response expected")
-    }
+    apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
+    val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[TopicMetadataSend].metadata
+    
+    // check assertions
+    val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(metadataResponse)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(brokers, partitionMetadata.head.replicas)
+    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
 
     // verify the expected calls to log manager occurred in the right order
     EasyMock.verify(logManager)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Mon Apr 30 21:34:49 2012
@@ -46,7 +46,7 @@ class ZookeeperConsumerConnectorTest ext
   val nMessages = 2
 
   def testBasic() {
-    val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
+    val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
     var actualMessages: List[Message] = Nil
 

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1332413&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Mon Apr 30 21:34:49 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.collection._
+import org.junit.{After, Before, Test}
+import junit.framework.Assert._
+import kafka.server._
+import kafka.message._
+import kafka.api._
+import kafka.utils.TestUtils
+
+class RequestPurgatoryTest {
+
+  val producerRequest1 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello1".getBytes)))
+  val producerRequest2 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello2".getBytes)))
+  var purgatory: MockRequestPurgatory = null
+  
+  @Before
+  def setup() {
+    purgatory = new MockRequestPurgatory()
+  }
+  
+  @After
+  def teardown() {
+    purgatory.shutdown()
+  }
+  
+  @Test
+  def testRequestSatisfaction() {
+    val r1 = new DelayedRequest(Array("test1"), null, 100000L)
+    val r2 = new DelayedRequest(Array("test2"), null, 100000L)
+    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size)
+    purgatory.watch(r1)
+    assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size)
+    purgatory.watch(r2)
+    assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+    purgatory.satisfied += r1
+    assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1))
+    assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
+    purgatory.satisfied += r2
+    assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
+    assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)  
+  }
+
+  @Test
+  def testRequestExpirey() {
+    val expiration = 20L
+    val r1 = new DelayedRequest(Array("test1"), null, expiration)
+    val r2 = new DelayedRequest(Array("test1"), null, 200000L)
+    val start = System.currentTimeMillis
+    purgatory.watch(r1)
+    purgatory.watch(r2)
+    purgatory.awaitExpiration(r1)
+    val ellapsed = System.currentTimeMillis - start
+    println(ellapsed)
+    assertTrue("r1 expired", purgatory.expired.contains(r1))
+    assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
+    assertTrue("Time for expiration was about 20ms", (ellapsed - expiration).abs < 10L)
+  }
+  
+  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+    val satisfied = mutable.Set[DelayedRequest]()
+    val expired = mutable.Set[DelayedRequest]()
+    def awaitExpiration(delayed: DelayedRequest) = {
+      delayed synchronized {
+        delayed.wait()
+      }
+    }
+    def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
+    def expire(delayed: DelayedRequest) {
+      expired += delayed
+      delayed synchronized {
+        delayed.notify()
+      }
+    }
+  }
+  
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1332413&r1=1332412&r2=1332413&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Apr 30 21:34:49 2012
@@ -416,6 +416,7 @@ object TestUtils extends Logging {
       leaderLock.unlock()
     }
   }
+
 }
 
 object TestZKUtils {



Mime
View raw message