kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1355578 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/javaapi/ main/scala/kafka/log/ main/scala/kafka/network/ main/scala/kafka/producer/ main/scala/...
Date Fri, 29 Jun 2012 22:06:05 GMT
Author: jjkoshy
Date: Fri Jun 29 22:05:57 2012
New Revision: 1355578

URL: http://svn.apache.org/viewvc?rev=1355578&view=rev
Log:
KAFKA-353 Tie producer-side ack with high watermark and progress of replicas; patched by Joel Koshy; reviewed by Jun Rao, Jay Kreps

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/RequestTimedOutException.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Throttler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Fri Jun 29 22:05:57 2012
@@ -78,6 +78,7 @@ object FetchRequest {
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultReplicaId = -1
+  val NonFollowerId = DefaultReplicaId
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Fri Jun 29 22:05:57 2012
@@ -103,15 +103,15 @@ object TopicData {
   }
 }
 
-case class TopicData(topic: String, partitionData: Array[PartitionData]) {
-  val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes)
+case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) {
+  val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes)
 
   // need to override equals due to brokern java-arrays equals functionality
   override def equals(other: Any): Boolean = {
     other match {
       case that: TopicData =>
         ( topic == that.topic &&
-          partitionData.toSeq == that.partitionData.toSeq )
+          partitionDataArray.toSeq == that.partitionDataArray.toSeq )
       case _ => false
     }
   }
@@ -124,11 +124,11 @@ class TopicDataSend(val topicData: Topic
 
   private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
   Utils.writeShortString(buffer, topicData.topic, "UTF-8")
-  buffer.putInt(topicData.partitionData.length)
+  buffer.putInt(topicData.partitionDataArray.length)
   buffer.rewind()
 
-  val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
-    val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
+  val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) {
+    val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes)
   }
 
   def complete = sent >= size
@@ -175,7 +175,7 @@ case class FetchResponse(versionId: Shor
   def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
     val messageSet = topicMap.get(topic) match {
       case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty)
+        TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty)
       case None =>
         MessageSet.Empty
     }
@@ -185,7 +185,7 @@ case class FetchResponse(versionId: Shor
   def highWatermark(topic: String, partition: Int): Long = {
     topicMap.get(topic) match {
       case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
+        TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L)
       case None => -1L
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Fri Jun 29 22:05:57 2012
@@ -58,7 +58,7 @@ object LeaderAndISRRequest {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
     val isInit = buffer.get()
-    val ackTimeout = buffer.getInt
+    val ackTimeoutMs = buffer.getInt
     val leaderAndISRRequestCount = buffer.getInt
     val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
 
@@ -69,7 +69,7 @@ object LeaderAndISRRequest {
 
       leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
     }
-    new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeout, leaderAndISRInfos)
+    new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
   }
 }
 
@@ -77,19 +77,19 @@ object LeaderAndISRRequest {
 case class LeaderAndISRRequest (versionId: Short,
                                 clientId: String,
                                 isInit: Byte,
-                                ackTimeout: Int,
+                                ackTimeoutMs: Int,
                                 leaderAndISRInfos:
                                 Map[(String, Int), LeaderAndISR])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
-  def this(isInit: Byte, ackTimeout: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
-    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeout, leaderAndISRInfos)
+  def this(isInit: Byte, ackTimeoutMs: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
+    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeoutMs, leaderAndISRInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.put(isInit)
-    buffer.putInt(ackTimeout)
+    buffer.putInt(ackTimeoutMs)
     buffer.putInt(leaderAndISRInfos.size)
     for((key, value) <- leaderAndISRInfos){
       Utils.writeShortString(buffer, key._1, "UTF-8")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Fri Jun 29 22:05:57 2012
@@ -21,6 +21,7 @@ import java.nio._
 import kafka.message._
 import kafka.utils._
 
+
 object ProducerRequest {
   val CurrentVersion: Short = 0
 
@@ -29,7 +30,7 @@ object ProducerRequest {
     val correlationId: Int = buffer.getInt
     val clientId: String = Utils.readShortString(buffer, "UTF-8")
     val requiredAcks: Short = buffer.getShort
-    val ackTimeout: Int = buffer.getInt
+    val ackTimeoutMs: Int = buffer.getInt
     //build the topic structure
     val topicCount = buffer.getInt
     val data = new Array[TopicData](topicCount)
@@ -48,7 +49,7 @@ object ProducerRequest {
       }
       data(i) = new TopicData(topic,partitionData)
     }
-    new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeout, data)
+    new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
   }
 }
 
@@ -56,24 +57,24 @@ case class ProducerRequest( versionId: S
                             correlationId: Int,
                             clientId: String,
                             requiredAcks: Short,
-                            ackTimeout: Int,
+                            ackTimeoutMs: Int,
                             data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
 
-  def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) =
-    this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeout, data)
+  def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
+    this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
     Utils.writeShortString(buffer, clientId, "UTF-8")
     buffer.putShort(requiredAcks)
-    buffer.putInt(ackTimeout)
+    buffer.putInt(ackTimeoutMs)
     //save the topic structure
     buffer.putInt(data.size) //the number of topics
     for(topicData <- data) {
       Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic
-      buffer.putInt(topicData.partitionData.size) //the number of partitions
-      for(partitionData <- topicData.partitionData) {
+      buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
+      for(partitionData <- topicData.partitionDataArray) {
         buffer.putInt(partitionData.partition)
         buffer.putInt(partitionData.messages.getSerialized().limit)
         buffer.put(partitionData.messages.getSerialized())
@@ -85,10 +86,10 @@ case class ProducerRequest( versionId: S
   def sizeInBytes: Int = {
     var size = 0 
     //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
-    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4;
+    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4
     for(topicData <- data) {
 	    size += 2 + topicData.topic.length + 4
-      for(partitionData <- topicData.partitionData) {
+      for(partitionData <- topicData.partitionDataArray) {
         size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int]
       }
     }
@@ -102,12 +103,13 @@ case class ProducerRequest( versionId: S
         ( correlationId == that.correlationId &&
           clientId == that.clientId &&
           requiredAcks == that.requiredAcks &&
-          ackTimeout == that.ackTimeout &&
+          ackTimeoutMs == that.ackTimeoutMs &&
           data.toSeq == that.data.toSeq )
       case _ => false
     }
   }
 
-  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length)
+  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length)
+
+}
 
-}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Fri Jun 29 22:05:57 2012
@@ -30,29 +30,29 @@ object StopReplicaRequest {
   def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
-    val ackTimeout = buffer.getInt
+    val ackTimeoutMs = buffer.getInt
     val topicPartitionPairCount = buffer.getInt
     val topicPartitionPairSet = new HashSet[(String, Int)]()
     for (i <- 0 until topicPartitionPairCount){
       topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
     }
-    new StopReplicaRequest(versionId, clientId, ackTimeout, topicPartitionPairSet)
+    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
-                              ackTimeout: Int,
+                              ackTimeoutMs: Int,
                               stopReplicaSet: Set[(String, Int)]
                                      ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
-  def this(ackTimeout: Int, stopReplicaSet: Set[(String, Int)]) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeout, stopReplicaSet)
+  def this(ackTimeoutMs: Int, stopReplicaSet: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeoutMs, stopReplicaSet)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
-    buffer.putInt(ackTimeout)
+    buffer.putInt(ackTimeoutMs)
     buffer.putInt(stopReplicaSet.size)
     for ((topic, partitionId) <- stopReplicaSet){
       Utils.writeShortString(buffer, topic, "UTF-8")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Fri Jun 29 22:05:57 2012
@@ -38,6 +38,7 @@ object ErrorMapping {
   val NoLeaderForPartitionCode : Short = 6
   val NotLeaderForPartitionCode : Short = 7
   val UnknownTopicCode : Short = 8
+  val RequestTimedOutCode: Short = 9
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -48,6 +49,7 @@ object ErrorMapping {
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+      classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/RequestTimedOutException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/RequestTimedOutException.scala?rev=1355578&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/RequestTimedOutException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/RequestTimedOutException.scala Fri Jun 29 22:05:57 2012
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+
+/**
+ * Thrown when a produce request times out - i.e., if one or more partitions it
+ * sends messages to receives fewer than the requiredAcks that is specified in
+ * the produce request.
+ */
+class RequestTimedOutException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Fri Jun 29 22:05:57 2012
@@ -95,7 +95,7 @@ private[kafka] class ZookeeperConsumerCo
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
-  private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+  private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
   private val messageStreamCreated = new AtomicBoolean(false)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Fri Jun 29 22:05:57 2012
@@ -23,10 +23,10 @@ import java.nio.ByteBuffer
 class ProducerRequest(val correlationId: Int,
                       val clientId: String,
                       val requiredAcks: Short,
-                      val ackTimeout: Int,
+                      val ackTimeoutMs: Int,
                       val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
 	
-  val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
+  val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Fri Jun 29 22:05:57 2012
@@ -367,11 +367,11 @@ private[kafka] class Log(val dir: File, 
 
   def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
     val segsArray = segments.view
-    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
+    var offsetTimeArray: Array[(Long, Long)] = null
     if (segsArray.last.size > 0)
-      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
+      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
     else
-      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
+      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 
     for (i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Fri Jun 29 22:05:57 2012
@@ -18,13 +18,14 @@
 package kafka.network
 
 import java.util.concurrent._
+import kafka.utils.SystemTime
 
 object RequestChannel { 
   val AllDone = new Request(1, 2, null, 0)
   case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
-  case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long) {
-    def this(request: Request, send: Send, ellapsed: Long) = 
-      this(request.processor, request.requestKey, send, request.start, ellapsed)
+  case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsedNs: Long) {
+    def this(request: Request, send: Send) =
+      this(request.processor, request.requestKey, send, request.start, SystemTime.nanoseconds - request.start)
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Jun 29 22:05:57 2012
@@ -101,7 +101,7 @@ class SyncProducer(val config: SyncProdu
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     for( topicData <- producerRequest.data ) {
-      for( partitionData <- topicData.partitionData ) {
+      for( partitionData <- topicData.partitionDataArray ) {
 	      verifyMessageSize(partitionData.messages)
         val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
         trace("Got message set with " + setSize + " bytes to send")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Fri Jun 29 22:05:57 2012
@@ -46,21 +46,28 @@ trait SyncProducerConfigShared {
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
 
   /* the client application sending the producer requests */
-  val correlationId = Utils.getInt(props,"producer.request.correlation_id",-1)
+  val correlationId = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
 
   /* the client application sending the producer requests */
-  val clientId = Utils.getString(props,"producer.request.client_id","")
+  val clientId = Utils.getString(props,"producer.request.client_id",SyncProducerConfig.DefaultClientId)
 
-  /* the required_acks of the producer requests */
-  val requiredAcks = Utils.getShort(props,"producer.request.required_acks",0)
+  /*
+   * The required acks of the producer requests - negative value means ack
+   * after the replicas in ISR have caught up to the leader's offset
+   * corresponding to this produce request.
+   */
+  val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
 
-  /* the ack_timeout of the producer requests */
-  val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1)
+  /*
+   * The ack timeout of the producer requests - negative value means wait
+   * indefinitely (or until an ack is received).
+   */
+  val ackTimeoutMs = Utils.getInt(props,"producer.request.ack.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs)
 }
 
 object SyncProducerConfig {
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = 1
+  val DefaultAckTimeoutMs = -1
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Fri Jun 29 22:05:57 2012
@@ -44,7 +44,7 @@ class DefaultEventHandler[K,V](config: P
     lock synchronized {
       val serializedData = serialize(events)
       var outstandingProduceRequests = serializedData
-      var remainingRetries = config.producerRetries
+      var remainingRetries = config.producerRetries + 1
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
         if (outstandingProduceRequests.size > 0)  {
@@ -171,7 +171,7 @@ class DefaultEventHandler[K,V](config: P
         partitionData.append(new PartitionData(partitionId, messagesSet))
       }
       val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
-      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData)
+      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeoutMs, topicData)
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
@@ -179,7 +179,7 @@ class DefaultEventHandler[K,V](config: P
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         var msgIdx = -1
         val errors = new ListBuffer[(String, Int)]
-        for( topic <- topicData; partition <- topic.partitionData ) {
+        for( topic <- topicData; partition <- topic.partitionDataArray ) {
           msgIdx += 1
           if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
             errors.append((topic.topic, partition.partition))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Fri Jun 29 22:05:57 2012
@@ -73,7 +73,7 @@ abstract class AbstractFetcherThread(val
         // process fetched data
         fetchMapLock synchronized {
           for ( topicData <- response.data ) {
-            for ( partitionData <- topicData.partitionData) {
+            for ( partitionData <- topicData.partitionDataArray) {
               val topic = topicData.topic
               val partitionId = partitionData.partition
               val key = (topic, partitionId)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Jun 29 22:05:57 2012
@@ -33,13 +33,15 @@ import scala.math._
 import java.lang.IllegalStateException
 import kafka.network.RequestChannel.Response
 
-
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
-                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+class KafkaApis(val requestChannel: RequestChannel,
+                val logManager: LogManager,
+                val replicaManager: ReplicaManager,
+                val kafkaZookeeper: KafkaZooKeeper) extends Logging {
 
+  private val produceRequestPurgatory = new ProducerRequestPurgatory
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -70,7 +72,7 @@ class KafkaApis(val requestChannel: Requ
     }
 
     val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse), -1))
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
   }
 
 
@@ -83,9 +85,25 @@ class KafkaApis(val requestChannel: Requ
       responseMap.put((topic, partition), ErrorMapping.NoError)
     }
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1))
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
 
+  /**
+   * Check if the partitionDataArray from a produce request can unblock any
+   * DelayedFetch requests.
+   */
+  def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
+    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
+    for(partitionData <- partitionDatas)
+      satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData)
+    trace("Produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+    // send any newly unblocked responses
+    for(fetchReq <- satisfied) {
+      val topicData = readMessageSets(fetchReq.fetch)
+      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+      requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
+    }
+  }
 
   /**
    * Handle a produce request
@@ -96,39 +114,62 @@ class KafkaApis(val requestChannel: Requ
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Producer request " + request.toString)
 
-    val response = produce(produceRequest)
-    debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
-    
-    // Now check any outstanding fetches this produce just unblocked
-    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
-    for(topicData <- produceRequest.data) {
-      for(partition <- topicData.partitionData)
-        satisfied ++= fetchRequestPurgatory.update((topicData.topic, partition.partition), topicData)
+    val response = produceToLocalLog(produceRequest)
+    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
+    if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1) {
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+
+      for (topicData <- produceRequest.data)
+        maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
     }
-    // send any newly unblocked responses
-    for(fetchReq <- satisfied) {
-       val topicData = readMessageSets(fetchReq.fetch)
-       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1))
+    else {
+      // create a list of (topic, partition) pairs to use as keys for this delayed request
+      val topicPartitionPairs = produceRequest.data.flatMap(topicData => {
+        val topic = topicData.topic
+        topicData.partitionDataArray.map(partitionData => {
+          (topic, partitionData.partition)
+        })
+      })
+
+      val delayedProduce = new DelayedProduce(
+        topicPartitionPairs, request,
+        response.errors, response.offsets,
+        produceRequest, produceRequest.ackTimeoutMs.toLong)
+      produceRequestPurgatory.watch(delayedProduce)
+
+      /*
+       * Replica fetch requests may have arrived (and potentially satisfied)
+       * delayedProduce requests before they even made it to the purgatory.
+       * Here, we explicitly check if any of them can be satisfied.
+       */
+      var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
+      topicPartitionPairs.foreach(topicPartition =>
+        satisfiedProduceRequests ++=
+          produceRequestPurgatory.update(topicPartition, topicPartition))
+      debug(satisfiedProduceRequests.size +
+        " DelayedProduce requests unblocked after produce to local log.")
+      satisfiedProduceRequests.foreach(_.respond())
     }
   }
 
   /**
    * Helper method for handling a parsed producer request
    */
-  private def produce(request: ProducerRequest): ProducerResponse = {
+  private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
     val requestSize = request.topicPartitionCount
     val errors = new Array[Short](requestSize)
     val offsets = new Array[Long](requestSize)
 
     var msgIndex = -1
     for(topicData <- request.data) {
-      for(partitionData <- topicData.partitionData) {
+      for(partitionData <- topicData.partitionDataArray) {
         msgIndex += 1
         BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
         BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
         try {
+          // TODO: should use replicaManager for ensurePartitionLeaderOnThisBroker?
+          // although this ties in with KAFKA-352
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
@@ -169,25 +210,38 @@ class KafkaApis(val requestChannel: Requ
     } catch {
       case e:FetchRequestFormatException =>
         val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response), -1)
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response))
         requestChannel.sendResponse(channelResponse)
     }
 
-    if(fetchRequest.replicaId != -1)
+    if(fetchRequest.replicaId != FetchRequest.NonFollowerId) {
       maybeUpdatePartitionHW(fetchRequest)
+      // after updating HW, some delayed produce requests may be unblocked
+      var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
+      fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
+        topicOffsetInfo.partitions.foreach(partition => {
+          satisfiedProduceRequests ++= produceRequestPurgatory.update(
+            (topicOffsetInfo.topic, partition), (topicOffsetInfo.topic, partition)
+          )
+        })
+      })
+      trace("Replica %d fetch unblocked %d DelayedProduce requests.".format(
+        fetchRequest.replicaId, satisfiedProduceRequests.size))
+      satisfiedProduceRequests.foreach(_.respond())
+    }
 
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
     if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
       val topicData = readMessageSets(fetchRequest)
       debug("Returning fetch response %s for fetch request with correlation id %d"
-        .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+        .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response), -1))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
-      val delayedFetch = new DelayedFetch(keys, request, fetchRequest, fetchRequest.maxWait, availableBytes)
+      val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
+      val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes)
       fetchRequestPurgatory.watch(delayedFetch)
     }
   }
@@ -258,7 +312,7 @@ class KafkaApis(val requestChannel: Requ
               " must exist on leader broker %d".format(logManager.config.brokerId))
             val leaderReplica = leaderReplicaOpt.get
             fetchRequest.replicaId match {
-              case -1 => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
+              case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
               case _ => // fetch request from a follower
                 val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
@@ -305,7 +359,7 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Offset request " + offsetRequest.toString)
     val offsets = logManager.getOffsets(offsetRequest)
     val response = new OffsetResponse(offsetRequest.versionId, offsets)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
   /**
@@ -343,7 +397,7 @@ class KafkaApis(val requestChannel: Requ
     }
     info("Sending response for topic metadata request")
     val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
   def close() {
@@ -355,18 +409,18 @@ class KafkaApis(val requestChannel: Requ
    */
   class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) {
     val bytesAccumulated = new AtomicLong(initialSize)
-   }
+  }
 
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, TopicData] {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
     
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(topicData: TopicData, delayedFetch: DelayedFetch): Boolean = {
-      val messageDataSize = topicData.partitionData.map(_.messages.sizeInBytes).sum
+    def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
+      val messageDataSize = partitionData.messages.sizeInBytes
       val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
@@ -377,7 +431,151 @@ class KafkaApis(val requestChannel: Requ
     def expire(delayed: DelayedFetch) {
       val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response), -1))
+      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
+    }
+  }
+
+  class DelayedProduce(keys: Seq[Any],
+                       request: RequestChannel.Request,
+                       localErrors: Array[Short],
+                       requiredOffsets: Array[Long],
+                       val produce: ProducerRequest,
+                       delayMs: Long)
+    extends DelayedRequest(keys, request, delayMs) with Logging {
+
+    /**
+     * Map of (topic, partition) -> partition status
+     * The values in this map don't need to be synchronized since updates to the
+     * values are effectively synchronized by the ProducerRequestPurgatory's
+     * update method
+     */
+    private val partitionStatus = keys.map(key => {
+      val keyIndex = keys.indexOf(key)
+      // if there was an error in writing to the local replica's log, then don't
+      // wait for acks on this partition
+      val acksPending =
+        if (localErrors(keyIndex) == ErrorMapping.NoError) {
+          // Timeout error state will be cleared when requiredAcks are received
+          localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode
+          true
+        }
+        else
+          false
+
+      val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex))
+      trace("Initial partition status for %s = %s".format(key, initialStatus))
+      (key, initialStatus)
+    }).toMap
+
+
+    def respond() {
+      val errorsAndOffsets: (List[Short], List[Long]) = (
+        keys.foldRight
+          ((List[Short](), List[Long]()))
+          ((key: Any, result: (List[Short], List[Long])) => {
+            val status = partitionStatus(key)
+            (status.error :: result._1, status.requiredOffset :: result._2)
+          })
+        )
+      val response = new ProducerResponse(produce.versionId, produce.correlationId,
+        errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+
+      requestChannel.sendResponse(new RequestChannel.Response(
+        request, new BoundedByteBufferSend(response)))
+    }
+
+    /**
+     * Returns true if this delayed produce request is satisfied (or more
+     * accurately, unblocked) -- this is the case if for every partition:
+     * Case A: This broker is not the leader: unblock - should return error.
+     * Case B: This broker is the leader:
+     *   B.1 - If there was a localError (when writing to the local log): unblock - should return error
+     *   B.2 - else, at least requiredAcks replicas should be caught up to this request.
+     *
+     * As partitions become acknowledged, we may be able to unblock
+     * DelayedFetchRequests that are pending on those partitions.
+     */
+    def isSatisfied(followerFetchPartition: (String, Int)) = {
+      val (topic, partitionId) = followerFetchPartition
+      val fetchPartitionStatus = partitionStatus(followerFetchPartition)
+      if (fetchPartitionStatus.acksPending) {
+        val leaderReplica = replicaManager.getLeaderReplica(topic, partitionId)
+        leaderReplica match {
+          case Some(leader) => {
+            if (leader.isLocal) {
+              val isr = leader.partition.inSyncReplicas
+              val numAcks = isr.count(r => {
+                if (!r.isLocal)
+                  r.logEndOffset() >= partitionStatus(followerFetchPartition).requiredOffset
+                else
+                  true /* also count the local (leader) replica */
+              })
+              trace("Received %d/%d acks for produce request to %s-%d".format(
+                numAcks, produce.requiredAcks,
+                topic, partitionId))
+              if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
+                  (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+                /*
+                 * requiredAcks < 0 means acknowledge after all replicas in ISR
+                 * are fully caught up to the (local) leader's offset
+                 * corresponding to this produce request.
+                 */
+                fetchPartitionStatus.acksPending = false
+                fetchPartitionStatus.error = ErrorMapping.NoError
+                val topicData =
+                  produce.data.find(_.topic == topic).get
+                val partitionData =
+                  topicData.partitionDataArray.find(_.partition == partitionId).get
+                maybeUnblockDelayedFetchRequests(
+                  topic, Array(partitionData))
+              }
+            }
+            else {
+              debug("Broker not leader for %s-%d".format(topic, partitionId))
+              fetchPartitionStatus.setThisBrokerNotLeader()
+            }
+          }
+          case None =>
+            debug("Broker not leader for %s-%d".format(topic, partitionId))
+            fetchPartitionStatus.setThisBrokerNotLeader()
+        }
+      }
+
+      // unblocked if there are no partitions with pending acks
+      ! partitionStatus.exists(p => p._2.acksPending)
+    }
+
+    class PartitionStatus(var acksPending: Boolean,
+                          var error: Short,
+                          val requiredOffset: Long) {
+      def setThisBrokerNotLeader() {
+        error = ErrorMapping.NotLeaderForPartitionCode
+        acksPending = false
+      }
+
+      override def toString =
+        "acksPending:%b, error: %d, requiredOffset: %d".format(
+        acksPending, error, requiredOffset
+        )
+    }
+  }
+
+  /**
+   * A holding pen for produce requests waiting to be satisfied.
+   */
+  private [kafka] class ProducerRequestPurgatory
+    extends RequestPurgatory[DelayedProduce, (String, Int)] {
+
+    protected def checkSatisfied(fetchRequestPartition: (String, Int),
+                                 delayedProduce: DelayedProduce) =
+      delayedProduce.isSatisfied(fetchRequestPartition)
+
+    /**
+     * Handle an expired delayed request
+     */
+    protected def expire(delayedProduce: DelayedProduce) {
+      delayedProduce.respond()
     }
   }
 }
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Jun 29 22:05:57 2012
@@ -26,6 +26,7 @@ import atomic.AtomicBoolean
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.ZkClient
 
+
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Fri Jun 29 22:05:57 2012
@@ -54,4 +54,4 @@ class ReplicaFetcherThread(name:String, 
   def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = {
     // no handler needed since the controller will make the changes accordingly
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/DelayedItem.scala Fri Jun 29 22:05:57 2012
@@ -20,11 +20,15 @@ package kafka.utils
 import java.util.concurrent._
 import scala.math._
 
-class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed {
-  
-  val delayMs = unit.toMillis(delay)
-  val createdMs = System.currentTimeMillis
-  
+class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed with Logging {
+
+  val createdMs = SystemTime.milliseconds
+  val delayMs = {
+    val given = unit.toMillis(delay)
+    if (given < 0 || (createdMs + given) < 0) (Long.MaxValue - createdMs)
+    else given
+  }
+
   def this(item: T, delayMs: Long) = 
     this(item, delayMs, TimeUnit.MILLISECONDS)
 
@@ -32,18 +36,18 @@ class DelayedItem[T](val item: T, delay:
    * The remaining delay time
    */
   def getDelay(unit: TimeUnit): Long = {
-    val ellapsedMs = (System.currentTimeMillis - createdMs)
-    unit.convert(max(delayMs - ellapsedMs, 0), unit)
+    val elapsedMs = (SystemTime.milliseconds - createdMs)
+    unit.convert(max(delayMs - elapsedMs, 0), unit)
   }
     
   def compareTo(d: Delayed): Int = {
     val delayed = d.asInstanceOf[DelayedItem[T]]
     val myEnd = createdMs + delayMs
     val yourEnd = delayed.createdMs + delayed.delayMs
-    
+
     if(myEnd < yourEnd) -1
     else if(myEnd > yourEnd) 1
-    else 0 
+    else 0
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Throttler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Throttler.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Throttler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Throttler.scala Fri Jun 29 22:05:57 2012
@@ -53,17 +53,17 @@ class Throttler(val desiredRatePerSec: D
     lock synchronized {
       observedSoFar += observed
       val now = time.nanoseconds
-      val ellapsedNs = now - periodStartNs
+      val elapsedNs = now - periodStartNs
       // if we have completed an interval AND we have observed something, maybe
       // we should take a little nap
-      if(ellapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
-        val rateInSecs = (observedSoFar * Time.NsPerSec) / ellapsedNs
+      if(elapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
+        val rateInSecs = (observedSoFar * Time.NsPerSec) / elapsedNs
         val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
         if(needAdjustment) {
           // solve for the amount of time to sleep to make us hit the desired rate
           val desiredRateMs = desiredRatePerSec / Time.MsPerSec.asInstanceOf[Double]
-          val ellapsedMs = ellapsedNs / Time.NsPerMs
-          val sleepTime = round(observedSoFar / desiredRateMs - ellapsedMs)
+          val elapsedMs = elapsedNs / Time.NsPerMs
+          val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
           if(sleepTime > 0) {
             Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + 
                                      ", sleeping for " + sleepTime + " ms to compensate.")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Fri Jun 29 22:05:57 2012
@@ -613,12 +613,12 @@ object Utils extends Logging {
       v
   }
 
-  def getHostPort(hostport: String) : Tuple2[String, Int] = {
+  def getHostPort(hostport: String) : (String, Int) = {
     val splits = hostport.split(":")
     (splits(0), splits(1).toInt)
   }
 
-  def getTopicPartition(topicPartition: String) : Tuple2[String, Int] = {
+  def getTopicPartition(topicPartition: String) : (String, Int) = {
     val index = topicPartition.lastIndexOf('-')
     (topicPartition.substring(0,index), topicPartition.substring(index+1).toInt)
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala Fri Jun 29 22:05:57 2012
@@ -56,11 +56,11 @@ object TestEndToEndLatency {
       var begin = System.nanoTime
       producer.send(new ProducerData(topic, message))
       val received = iter.next
-      val ellapsed = System.nanoTime - begin
+      val elapsed = System.nanoTime - begin
       // poor man's progress bar
       if(i % 10000 == 0)
-        println(i + "\t" + ellapsed / 1000.0 / 1000.0)
-      totalTime += ellapsed
+        println(i + "\t" + elapsed / 1000.0 / 1000.0)
+      totalTime += elapsed
     }
     println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms"
     producer.close()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala Fri Jun 29 22:05:57 2012
@@ -69,8 +69,8 @@ object TestLinearWriteSpeed {
       buffer.rewind()
       channels(i % numFiles).write(buffer)
     }
-    val ellapsedSecs = (System.currentTimeMillis - begin) / 1000.0
-    System.out.println(bytesToWrite / (1024 * 1024 * ellapsedSecs) + " MB per sec")
+    val elapsedSecs = (System.currentTimeMillis - begin) / 1000.0
+    System.out.println(bytesToWrite / (1024 * 1024 * elapsedSecs) + " MB per sec")
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala Fri Jun 29 22:05:57 2012
@@ -43,10 +43,10 @@ object TestLogPerformance {
     for(i <- 0 until numBatches)
       log.append(messageSet)
     log.close()
-    val ellapsed = (System.currentTimeMillis() - start) / 1000.0
+    val elapsed = (System.currentTimeMillis() - start) / 1000.0
     val writtenBytes = MessageSet.entrySize(message) * numMessages
     println("message size = " + MessageSet.entrySize(message))
-    println("MB/sec: " + writtenBytes / ellapsed / (1024.0 * 1024.0))
+    println("MB/sec: " + writtenBytes / elapsed / (1024.0 * 1024.0))
     Utils.rm(dir)
   }
   

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Jun 29 22:05:57 2012
@@ -30,6 +30,8 @@ import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
+import java.util
+
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
@@ -85,22 +87,31 @@ class ProducerTest extends JUnit3Suite w
 
   @Test
   def testZKSendToNewTopic() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    val props1 = new util.Properties()
+    props1.put("serializer.class", "kafka.serializer.StringEncoder")
+    props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props1.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props1.put("producer.request.required.acks", "2")
+    props1.put("producer.request.ack.timeout.ms", "-1")
+
+    val props2 = new util.Properties()
+    props2.putAll(props1)
+    props2.put("producer.request.required.acks", "3")
+    props2.put("producer.request.ack.timeout.ms", "1000")
 
-    val config = new ProducerConfig(props)
+    val config1 = new ProducerConfig(props1)
+    val config2 = new ProducerConfig(props2)
 
     // create topic with 1 partition and await leadership
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
 
-    val producer = new Producer[String, String](config)
+    val producer1 = new Producer[String, String](config1)
+    val producer2 = new Producer[String, String](config2)
     try {
       // Available partition ids should be 0.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       // get the leader
       val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
       assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
@@ -118,13 +129,27 @@ class ProducerTest extends JUnit3Suite w
       assertEquals(new Message("test1".getBytes), messageSet.next.message)
       assertTrue("Message set should have 1 message", messageSet.hasNext)
       assertEquals(new Message("test1".getBytes), messageSet.next.message)
+      assertFalse("Message set should not have any more messages", messageSet.hasNext)
     } catch {
       case e: Exception => fail("Not expected", e)
     } finally {
-      producer.close
+      producer1.close()
+    }
+
+    try {
+      producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
+      fail("Should have timed out for 3 acks.")
+    }
+    catch {
+      case se: FailedToSendMessageException => true
+      case e => fail("Not expected", e)
+    }
+    finally {
+      producer2.close()
     }
   }
 
+
   @Test
   def testZKSendWithDeadBroker() {
     val props = new Properties()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Fri Jun 29 22:05:57 2012
@@ -39,7 +39,7 @@ class RequestPurgatoryTest {
   def teardown() {
     purgatory.shutdown()
   }
-  
+
   @Test
   def testRequestSatisfaction() {
     val r1 = new DelayedRequest(Array("test1"), null, 100000L)
@@ -66,11 +66,11 @@ class RequestPurgatoryTest {
     purgatory.watch(r1)
     purgatory.watch(r2)
     purgatory.awaitExpiration(r1)
-    val ellapsed = System.currentTimeMillis - start
-    println(ellapsed)
+    val elapsed = System.currentTimeMillis - start
+    println(elapsed)
     assertTrue("r1 expired", purgatory.expired.contains(r1))
     assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
-    assertTrue("Time for expiration was about 20ms", (ellapsed - expiration).abs < 10L)
+    assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
   }
   
   class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1355578&r1=1355577&r2=1355578&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Jun 29 22:05:57 2012
@@ -343,9 +343,9 @@ object TestUtils extends Logging {
   def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, acks: Int): kafka.api.ProducerRequest = {
     val correlationId = SyncProducerConfig.DefaultCorrelationId
     val clientId = SyncProducerConfig.DefaultClientId
-    val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
-    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeout, data.toArray)
+    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray)
   }
 
   def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
@@ -359,12 +359,12 @@ object TestUtils extends Logging {
   def produceJavaRequest(correlationId: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
     val clientId = "test"
     val requiredAcks: Short = 0
-    val ackTimeout = 0
+    val ackTimeoutMs = 0
     var data = new Array[TopicData](1)
     var partitionData = new Array[PartitionData](1)
     partitionData(0) = new PartitionData(partition,message.underlying)
     data(0) = new TopicData(topic,partitionData)
-    val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)  	
+    val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
     pr
   }
 



Mime
View raw message