kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1243407 [1/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaapi/cons...
Date Mon, 13 Feb 2012 03:58:39 GMT
Author: junrao
Date: Mon Feb 13 03:58:37 2012
New Revision: 1243407

URL: http://svn.apache.org/viewvc?rev=1243407&view=rev
Log:
new consumer request format; patched by Prashanth Menon; reviewed by Jun Rao and Jay Kreps; KAFKA-240

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Mon Feb 13 03:58:37 2012
@@ -16,27 +16,26 @@
  */
 package kafka.etl;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.zip.CRC32;
 import kafka.api.FetchRequest;
-import kafka.javaapi.MultiFetchResponse;
+import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
 import kafka.common.ErrorMapping;
+import kafka.javaapi.FetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
+import kafka.javaapi.message.MessageSet;
 import kafka.message.MessageAndOffset;
-import kafka.message.MessageSet;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
 
 @SuppressWarnings({ "deprecation"})
 public class KafkaETLContext {
@@ -59,7 +58,8 @@ public class KafkaETLContext {
     protected long _offset = Long.MAX_VALUE; /*current offset*/
     protected long _count; /*current count*/
 
-    protected MultiFetchResponse _response = null;  /*fetch response*/
+    protected int requestId = 0; /* the id of the next fetch request */
+    protected FetchResponse _response = null;  /*fetch response*/
     protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
     protected Iterator<ByteBufferMessageSet> _respIterator = null;
     protected int _retry = 0;
@@ -149,15 +149,19 @@ public class KafkaETLContext {
     public boolean fetchMore () throws IOException {
         if (!hasMore()) return false;
         
-        FetchRequest fetchRequest = 
-            new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize);
-        List<FetchRequest> array = new ArrayList<FetchRequest>();
-        array.add(fetchRequest);
+        FetchRequest fetchRequest = new FetchRequestBuilder()
+                .correlationId(requestId)
+                .clientId(_request.clientId())
+                .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize)
+                .build();
 
         long tempTime = System.currentTimeMillis();
-        _response = _consumer.multifetch(array);
-        if(_response != null)
-            _respIterator = _response.iterator();
+        _response = _consumer.fetch(fetchRequest);
+        if(_response != null) {
+            _respIterator = new ArrayList<ByteBufferMessageSet>(){{
+                add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition()));
+            }}.iterator();
+        }
         _requestTime += (System.currentTimeMillis() - tempTime);
         
         return true;

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java Mon Feb 13 03:58:37 2012
@@ -29,6 +29,7 @@ public class KafkaETLRequest {
     URI _uri;
     int _partition;
     long _offset = DEFAULT_OFFSET;
+    String _clientId = "KafkaHadoopETL";
     
     public KafkaETLRequest() {
         
@@ -83,11 +84,11 @@ public class KafkaETLRequest {
         _offset = offset;
     }
     
-    public String getTopic() { return _topic;}
-    public URI getURI () { return _uri;}
-    public int getPartition() { return _partition;}
-    
-    public long getOffset() { return _offset;}
+    public String getTopic() { return _topic; }
+    public URI getURI () { return _uri; }
+    public int getPartition() { return _partition; }
+    public long getOffset() { return _offset; }
+    public String clientId() { return _clientId; }
 
     public boolean isValidOffset() {
         return _offset >= 0;

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Mon Feb 13 03:58:37 2012
@@ -20,32 +20,149 @@ package kafka.api
 import java.nio._
 import kafka.network._
 import kafka.utils._
+import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
+
+object OffsetDetail {
+
+  def readFrom(buffer: ByteBuffer): OffsetDetail = {
+    val topic = Utils.readShortString(buffer, "UTF-8")
+
+    val partitionsCount = buffer.getInt
+    val partitions = new Array[Int](partitionsCount)
+    for (i <- 0 until partitions.length)
+      partitions(i) = buffer.getInt
+
+    val offsetsCount = buffer.getInt
+    val offsets = new Array[Long](offsetsCount)
+    for (i <- 0 until offsets.length)
+      offsets(i) = buffer.getLong
+
+    val fetchesCount = buffer.getInt
+    val fetchSizes = new Array[Int](fetchesCount)
+    for (i <- 0 until fetchSizes.length)
+      fetchSizes(i) = buffer.getInt
+
+    new OffsetDetail(topic, partitions, offsets, fetchSizes)
+  }
+
+}
+
+case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) {
+
+  def writeTo(buffer: ByteBuffer) {
+    Utils.writeShortString(buffer, topic, "UTF-8")
+
+    if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
+      throw new IllegalArgumentException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
+
+    buffer.putInt(partitions.length)
+    partitions.foreach(buffer.putInt(_))
+
+    buffer.putInt(offsets.length)
+    offsets.foreach(buffer.putLong(_))
+
+    buffer.putInt(fetchSizes.length)
+    fetchSizes.foreach(buffer.putInt(_))
+  }
+
+  def sizeInBytes(): Int = {
+    2 + topic.length() +                              // topic string
+      partitions.foldLeft(4)((s, _) => s + 4) +       // each request partition (int)
+      offsets.foldLeft(4)((s, _) => s + 8) +          // each request offset (long)
+      fetchSizes.foldLeft(4)((s,_) => s + 4)          // each request fetch size
+  }
+}
 
 object FetchRequest {
-    
+  val CurrentVersion = 1.shortValue()
+
   def readFrom(buffer: ByteBuffer): FetchRequest = {
-    val topic = Utils.readShortString(buffer, "UTF-8")
-    val partition = buffer.getInt()
-    val offset = buffer.getLong()
-    val size = buffer.getInt()
-    new FetchRequest(topic, partition, offset, size)
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = Utils.readShortString(buffer, "UTF-8")
+    val replicaId = buffer.getInt
+    val maxWait = buffer.getInt
+    val minBytes = buffer.getInt
+    val offsetsCount = buffer.getInt
+    val offsetInfo = new Array[OffsetDetail](offsetsCount)
+    for(i <- 0 until offsetInfo.length)
+      offsetInfo(i) = OffsetDetail.readFrom(buffer)
+
+    new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo)
   }
+
 }
 
-class FetchRequest(val topic: String,
-                   val partition: Int,
-                   val offset: Long, 
-                   val maxSize: Int) extends Request(RequestKeys.Fetch) {
-  
+case class FetchRequest( versionId: Short,
+                         correlationId: Int,
+                         clientId: String,
+                         replicaId: Int,
+                         maxWait: Int,
+                         minBytes: Int,
+                         offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
+
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic)
-    buffer.putInt(partition)
-    buffer.putLong(offset)
-    buffer.putInt(maxSize)
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    Utils.writeShortString(buffer, clientId, "UTF-8")
+    buffer.putInt(replicaId)
+    buffer.putInt(maxWait)
+    buffer.putInt(minBytes)
+    buffer.putInt(offsetInfo.size)
+    for(topicDetail <- offsetInfo) {
+      topicDetail.writeTo(buffer)
+    }
   }
-  
-  def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
 
-  override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset +
-    " maxSize:" + maxSize + ")"
+  def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
+}
+
+class FetchRequestBuilder() {
+  private var correlationId = -1
+  private val versionId = FetchRequest.CurrentVersion
+  private var clientId = ""
+  private var replicaId = -1        // sensible default
+  private var maxWait = -1          // sensible default
+  private var minBytes = -1         // sensible default
+  private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
+
+  def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
+    val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]()))
+    topicData._1.append(partition)
+    topicData._2.append(offset)
+    topicData._3.append(fetchSize)
+    this
+  }
+
+  def correlationId(correlationId: Int): FetchRequestBuilder = {
+    this.correlationId = correlationId
+    this
+  }
+
+  def clientId(clientId: String): FetchRequestBuilder = {
+    this.clientId = clientId
+    this
+  }
+
+  def replicaId(replicaId: Int): FetchRequestBuilder = {
+    this.replicaId = replicaId
+    this
+  }
+
+  def maxWait(maxWait: Int): FetchRequestBuilder = {
+    this.maxWait = maxWait
+    this
+  }
+
+  def minBytes(minBytes: Int): FetchRequestBuilder = {
+    this.minBytes = minBytes
+    this
+  }
+
+  def build() = {
+    val offsetDetails = requestMap.map{ topicData =>
+      new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
+    }
+    new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
+  }
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1243407&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Feb 13 03:58:37 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import java.nio.channels.GatheringByteChannel
+import kafka.common.ErrorMapping
+import kafka.message.{MessageSet, ByteBufferMessageSet}
+import kafka.network.{MultiSend, Send}
+import kafka.utils.Utils
+
+object PartitionData {
+  def readFrom(buffer: ByteBuffer): PartitionData = {
+    val partition = buffer.getInt
+    val error = buffer.getInt
+    val initialOffset = buffer.getLong
+    val messageSetSize = buffer.getInt
+    val messageSetBuffer = buffer.slice()
+    messageSetBuffer.limit(messageSetSize)
+    buffer.position(buffer.position + messageSetSize)
+    new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+  }
+}
+
+case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
+  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
+}
+
+object TopicData {
+
+  def readFrom(buffer: ByteBuffer): TopicData = {
+    val topic = Utils.readShortString(buffer, "UTF-8")
+    val partitionCount = buffer.getInt
+    val partitions = new Array[PartitionData](partitionCount)
+    for(i <- 0 until partitions.length)
+      partitions(i) = PartitionData.readFrom(buffer)
+    new TopicData(topic, partitions.sortBy(_.partition))
+  }
+
+  def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = {
+    if(data == null || data.size == 0)
+      return None
+
+    var (low, high) = (0, data.size-1)
+    while(low <= high) {
+      val mid = (low + high) / 2
+      val found = data(mid)
+      if(found.partition == partition)
+        return Some(found)
+      else if(partition < found.partition)
+        high = mid - 1
+      else
+        low = mid + 1
+    }
+    None
+  }
+}
+
+case class TopicData(topic: String, partitionData: Array[PartitionData]) {
+  val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes)
+}
+
+object FetchResponse {
+  def readFrom(buffer: ByteBuffer): FetchResponse = {
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val dataCount = buffer.getInt
+    val data = new Array[TopicData](dataCount)
+    for(i <- 0 until data.length)
+      data(i) = TopicData.readFrom(buffer)
+    new FetchResponse(versionId, correlationId, data)
+  }
+}
+
+case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData])  {
+
+  val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes)
+
+  lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
+  
+  def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
+    val messageSet = topicMap.get(topic) match {
+      case Some(topicData) =>
+        TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty)
+      case None =>
+        MessageSet.Empty
+    }
+    messageSet.asInstanceOf[ByteBufferMessageSet]
+  }
+}
+
+// SENDS
+
+class PartitionDataSend(val partitionData: PartitionData) extends Send {
+  private val messageSize = partitionData.messages.sizeInBytes
+  private var messagesSentSize = 0L
+
+  private val buffer = ByteBuffer.allocate(20)
+  buffer.putInt(partitionData.partition)
+  buffer.putInt(partitionData.error)
+  buffer.putLong(partitionData.initialOffset)
+  buffer.putInt(partitionData.messages.sizeInBytes.intValue())
+  buffer.rewind()
+
+  def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+
+  def writeTo(channel: GatheringByteChannel): Int = {
+    var written = 0
+    if(buffer.hasRemaining)
+      written += channel.write(buffer)
+    if(!buffer.hasRemaining && messagesSentSize < messageSize) {
+      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
+      messagesSentSize += bytesSent
+      written += bytesSent
+    }
+    written
+  }
+}
+
+class TopicDataSend(val topicData: TopicData) extends Send {
+  val size = topicData.sizeInBytes
+
+  var sent = 0
+
+  private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
+  Utils.writeShortString(buffer, topicData.topic, "UTF-8")
+  buffer.putInt(topicData.partitionData.length)
+  buffer.rewind()
+
+  val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
+    val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
+  }
+
+  def complete = sent >= size
+
+  def writeTo(channel: GatheringByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    if(buffer.hasRemaining)
+      written += channel.write(buffer)
+    if(!buffer.hasRemaining && !sends.complete) {
+      written += sends.writeCompletely(channel)
+    }
+    sent += written
+    written
+  }
+}
+
+class FetchResponseSend(val fetchResponse: FetchResponse,
+                        val errorCode: Int = ErrorMapping.NoError) extends Send {
+
+  private val size = fetchResponse.sizeInBytes
+  
+  private var sent = 0
+  
+  private val buffer = ByteBuffer.allocate(16)
+  buffer.putInt(size + 2)
+  buffer.putShort(errorCode.shortValue())
+  buffer.putShort(fetchResponse.versionId)
+  buffer.putInt(fetchResponse.correlationId)
+  buffer.putInt(fetchResponse.data.length)
+  buffer.rewind()
+  
+  val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) {
+    val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes)
+  }
+
+  def complete = sent >= sendSize
+
+  def writeTo(channel: GatheringByteChannel):Int = {
+    expectIncomplete()
+    var written = 0
+    if(buffer.hasRemaining)
+      written += channel.write(buffer)
+    if(!buffer.hasRemaining && !sends.complete) {
+      written += sends.writeCompletely(channel)
+    }
+    sent += written
+    written
+  }
+
+  def sendSize = 4 + 2 + fetchResponse.sizeInBytes
+
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Mon Feb 13 03:58:37 2012
@@ -20,8 +20,7 @@ package kafka.api
 object RequestKeys {
   val Produce: Short = 0
   val Fetch: Short = 1
-  val MultiFetch: Short = 2
-  val MultiProduce: Short = 3
-  val Offsets: Short = 4
-  val TopicMetadata: Short = 5
+  val MultiProduce: Short = 2
+  val Offsets: Short = 3
+  val TopicMetadata: Short = 4
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Mon Feb 13 03:58:37 2012
@@ -50,8 +50,7 @@ class ConsumerConfig(props: Properties) 
 
   /** consumer id: generated automatically if not set.
    *  Set this explicitly for only testing purpose. */
-  val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */
-    if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None
+  val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
 
   /** the socket timeout for network requests */
   val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Mon Feb 13 03:58:37 2012
@@ -17,13 +17,14 @@
 
 package kafka.consumer
 
+import java.io.IOException
 import java.util.concurrent.CountDownLatch
-import kafka.common.ErrorMapping
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
 import kafka.cluster.{Partition, Broker}
-import kafka.api.{OffsetRequest, FetchRequest}
-import org.I0Itec.zkclient.ZkClient
+import kafka.common.ErrorMapping
+import kafka.message.ByteBufferMessageSet
 import kafka.utils._
-import java.io.IOException
+import org.I0Itec.zkclient.ZkClient
 
 class FetcherRunnable(val name: String,
                       val zkClient : ZkClient,
@@ -50,18 +51,26 @@ class FetcherRunnable(val name: String,
       info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: "
         + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port)
 
+    var reqId = 0
     try {
       while (!stopped) {
-        val fetches = partitionTopicInfos.map(info =>
-          new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
-
-        trace("fetch request: " + fetches.toString)
-
-        val response = simpleConsumer.multifetch(fetches : _*)
+        // TODO: fix up the max wait and min bytes
+        val builder = new FetchRequestBuilder().
+          correlationId(reqId).
+          clientId(config.consumerId.getOrElse(name)).
+          maxWait(0).
+          minBytes(0)
+        partitionTopicInfos.foreach(pti =>
+          builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize)
+        )
+
+        val fetchRequest = builder.build()
+        trace("fetch request: " + fetchRequest)
+        val response = simpleConsumer.fetch(fetchRequest)
 
         var read = 0L
-
-        for((messages, infopti) <- response.zip(partitionTopicInfos)) {
+        for(infopti <- partitionTopicInfos) {
+          val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet]
           try {
             var done = false
             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
@@ -76,8 +85,7 @@ class FetcherRunnable(val name: String,
             }
             if (!done)
               read += infopti.enqueue(messages, infopti.getFetchOffset)
-          }
-          catch {
+          } catch {
             case e1: IOException =>
               // something is wrong with the socket, re-throw the exception to stop the fetcher
               throw e1
@@ -91,6 +99,7 @@ class FetcherRunnable(val name: String,
               throw e2
           }
         }
+        reqId = if(reqId == Int.MaxValue) 0 else reqId + 1
 
         trace("fetched bytes: " + read)
         if(read == 0) {
@@ -98,8 +107,7 @@ class FetcherRunnable(val name: String,
           Thread.sleep(config.fetcherBackoffMs)
         }
       }
-    }
-    catch {
+    } catch {
       case e =>
         if (stopped)
           info("FecherRunnable " + this + " interrupted")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Feb 13 03:58:37 2012
@@ -20,7 +20,6 @@ package kafka.consumer
 import java.net._
 import java.nio.channels._
 import kafka.api._
-import kafka.message._
 import kafka.network._
 import kafka.utils._
 
@@ -72,7 +71,7 @@ class SimpleConsumer(val host: String,
    *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
    *  @return a set of fetched messages
    */
-  def fetch(request: FetchRequest): ByteBufferMessageSet = {
+  def fetch(request: FetchRequest): FetchResponse = {
     lock synchronized {
       val startTime = SystemTime.nanoseconds
       getOrMakeConnection()
@@ -88,51 +87,19 @@ class SimpleConsumer(val host: String,
             channel = connect
             sendRequest(request)
             response = getResponse
-          }catch {
+          } catch {
             case ioe: java.io.IOException => channel = null; throw ioe;
           }
         case e => throw e
       }
-      val endTime = SystemTime.nanoseconds
-      SimpleConsumerStats.recordFetchRequest(endTime - startTime)
-      SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
-      new ByteBufferMessageSet(response._1.buffer, request.offset, response._2)
-    }
-  }
+      val fetchResponse = FetchResponse.readFrom(response._1.buffer)
+      val fetchedSize = fetchResponse.sizeInBytes
 
-  /**
-   *  Combine multiple fetch requests in one call.
-   *
-   *  @param fetches  a sequence of fetch requests.
-   *  @return a sequence of fetch responses
-   */
-  def multifetch(fetches: FetchRequest*): MultiFetchResponse = {
-    lock synchronized {
-      val startTime = SystemTime.nanoseconds
-      getOrMakeConnection()
-      var response: Tuple2[Receive,Int] = null
-      try {
-        sendRequest(new MultiFetchRequest(fetches.toArray))
-        response = getResponse
-      } catch {
-        case e : java.io.IOException =>
-          info("Reconnect in multifetch due to socket error: ", e)
-          // retry once
-          try {
-            channel = connect
-            sendRequest(new MultiFetchRequest(fetches.toArray))
-            response = getResponse
-          }catch {
-            case ioe: java.io.IOException => channel = null; throw ioe;
-          }
-        case e => throw e        
-      }
       val endTime = SystemTime.nanoseconds
       SimpleConsumerStats.recordFetchRequest(endTime - startTime)
-      SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
+      SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
 
-      // error code will be set on individual messageset inside MultiFetchResponse
-      new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset))
+      fetchResponse
     }
   }
 
@@ -158,7 +125,7 @@ class SimpleConsumer(val host: String,
             channel = connect
             sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
             response = getResponse
-          }catch {
+          } catch {
             case ioe: java.io.IOException => channel = null; throw ioe;
           }
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Mon Feb 13 03:58:37 2012
@@ -32,8 +32,7 @@ private[kafka] object TopicCount extends
         case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
         case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
       }
-    }
-    catch {
+    } catch {
       case e =>
         error("error parsing consumer json string " + jsonString, e)
         throw e
@@ -46,8 +45,7 @@ private[kafka] object TopicCount extends
 
 private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
 
-  def getConsumerThreadIdsPerTopic()
-    : Map[String, Set[String]] = {
+  def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = {
     val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
     for ((topic, nConsumers) <- topicCountMap) {
       val consumerSet = new mutable.HashSet[String]

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Feb 13 03:58:37 2012
@@ -105,8 +105,7 @@ private[kafka] class ZookeeperConsumerCo
 
   def this(config: ConsumerConfig) = this(config, true)
 
-  def createMessageStreams[T](topicCountMap: Map[String,Int],
-                              decoder: Decoder[T])
+  def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
       : Map[String,List[KafkaMessageStream[T]]] = {
     consume(topicCountMap, decoder)
   }
@@ -138,8 +137,7 @@ private[kafka] class ZookeeperConsumerCo
           zkClient.close()
           zkClient = null
         }
-      }
-      catch {
+      } catch {
         case e =>
           fatal("error during consumer connector shutdown", e)
       }
@@ -147,8 +145,7 @@ private[kafka] class ZookeeperConsumerCo
     }
   }
 
-  def consume[T](topicCountMap: scala.collection.Map[String,Int],
-                 decoder: Decoder[T])
+  def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
       : Map[String,List[KafkaMessageStream[T]]] = {
     debug("entering consume ")
     if (topicCountMap == null)
@@ -159,13 +156,13 @@ private[kafka] class ZookeeperConsumerCo
 
     var consumerUuid : String = null
     config.consumerId match {
-      case Some(consumerId) // for testing only
-      => consumerUuid = consumerId
-      case None // generate unique consumerId automatically
-      => val uuid = UUID.randomUUID()
-        consumerUuid = "%s-%d-%s".format(
-          InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
-          uuid.getMostSignificantBits().toHexString.substring(0,8))
+      case Some(consumerId) => // for testing only
+        consumerUuid = consumerId
+      case None => // generate unique consumerId automatically
+        val uuid = UUID.randomUUID()
+        consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName,
+                                          System.currentTimeMillis,
+                                          uuid.getMostSignificantBits().toHexString.substring(0,8) )
     }
     val consumerIdString = config.groupId + "_" + consumerUuid
     val topicCount = new TopicCount(consumerIdString, topicCountMap)
@@ -243,8 +240,7 @@ private[kafka] class ZookeeperConsumerCo
         try {
           updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
             newOffset.toString)
-        }
-        catch {
+        } catch {
           case t: Throwable =>
           // log it and let it go
             warn("exception during commitOffsets",  t)
@@ -321,8 +317,7 @@ private[kafka] class ZookeeperConsumerCo
                                             ConsumerConfig.SocketBufferSize)
       val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
       producedOffset = offsets(0)
-    }
-    catch {
+    } catch {
       case e =>
         error("error in earliestOrLatestOffset() ", e)
     }
@@ -419,8 +414,7 @@ private[kafka] class ZookeeperConsumerCo
           val cluster = getCluster(zkClient)
           try {
             done = rebalance(cluster)
-          }
-          catch {
+          } catch {
             case e =>
               /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
                * For example, a ZK node can disappear between the time we get all children and the time we try to get
@@ -433,7 +427,7 @@ private[kafka] class ZookeeperConsumerCo
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
           if (done) {
             return
-          }else {
+          } else {
               /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                * clear the cache */
               info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
@@ -529,7 +523,7 @@ private[kafka] class ZookeeperConsumerCo
         oldConsumersPerTopicMap = consumersPerTopicMap
         updateFetcher(cluster, kafkaMessageStreams)
         true
-      }else
+      } else
         false
     }
 
@@ -611,8 +605,7 @@ private[kafka] class ZookeeperConsumerCo
           createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
           true
-        }
-        catch {
+        } catch {
           case e: ZkNodeExistsException =>
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
             info("waiting for the partition ownership to be deleted: " + partition)

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1243407&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala Mon Feb 13 03:58:37 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.api.TopicData
+
+
+class FetchResponse( val versionId: Short,
+                     val correlationId: Int,
+                     val data: Array[TopicData] ) {
+
+  private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data)
+
+  def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
+    import Implicits._
+    underlying.messageSet(topic, partition)
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Feb 13 03:58:37 2012
@@ -28,9 +28,6 @@ private[javaapi] object Implicits extend
                                                    messageSet.getErrorCode)
   }
 
-  implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
-    response.underlying
-
-  implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
-    new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
+  implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
+    new kafka.javaapi.FetchResponse(response.versionId, response.correlationId, response.data)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Mon Feb 13 03:58:37 2012
@@ -17,10 +17,9 @@
 
 package kafka.javaapi.consumer
 
-import kafka.utils.threadsafe
-import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.javaapi.MultiFetchResponse
 import kafka.api.FetchRequest
+import kafka.javaapi.FetchResponse
+import kafka.utils.threadsafe
 
 /**
  * A consumer of kafka messages
@@ -38,24 +37,12 @@ class SimpleConsumer(val host: String,
    *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
    *  @return a set of fetched messages
    */
-  def fetch(request: FetchRequest): ByteBufferMessageSet = {
+  def fetch(request: FetchRequest): FetchResponse = {
     import kafka.javaapi.Implicits._
     underlying.fetch(request)
   }
 
   /**
-   *  Combine multiple fetch requests in one call.
-   *
-   *  @param fetches  a sequence of fetch requests.
-   *  @return a sequence of fetch responses
-   */
-  def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = {
-    import scala.collection.JavaConversions._
-    import kafka.javaapi.Implicits._
-    underlying.multifetch(asBuffer(fetches): _*)
-  }
-
-  /**
    *  Get a list of valid offsets (up to maxSize) before the given time.
    *  The result is a list of offsets, in descending order.
    *

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala Mon Feb 13 03:58:37 2012
@@ -28,7 +28,7 @@ import kafka.utils._
 @nonthreadsafe
 private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging {
   
-  private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
+  private val sizeBuffer = ByteBuffer.allocate(4)
   private var contentBuffer: ByteBuffer = null
   
   def this() = this(Int.MaxValue)
@@ -78,12 +78,10 @@ private[kafka] class BoundedByteBufferRe
     var buffer: ByteBuffer = null
     try {
       buffer = ByteBuffer.allocate(size)
-    }
-    catch {
-      case e: OutOfMemoryError => {
+    } catch {
+      case e: OutOfMemoryError =>
         error("OOME with size " + size, e)
         throw e
-      }
       case e2 =>
         throw e2
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Mon Feb 13 03:58:37 2012
@@ -21,8 +21,8 @@ import java.util.concurrent._
 
 object RequestChannel { 
   val AllDone = new Request(1, 2, null, 0)
-  case class Request(val processor: Int, requestKey: Any, request: Receive, start: Long)
-  case class Response(val processor: Int, requestKey: Any, response: Send, start: Long, ellapsed: Long)
+  case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
+  case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long)
 }
 
 class RequestChannel(val numProcessors: Int, val queueSize: Int) { 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala Mon Feb 13 03:58:37 2012
@@ -50,7 +50,7 @@ class SocketServerStats(val monitorDurat
     requestTypeId match {
       case r if r == RequestKeys.Produce || r == RequestKeys.MultiProduce =>
         produceTimeStats.recordRequestMetric(durationNs)
-      case r if r == RequestKeys.Fetch || r == RequestKeys.MultiFetch =>
+      case r if r == RequestKeys.Fetch =>
         fetchTimeStats.recordRequestMetric(durationNs)
       case _ => /* not collecting; let go */
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala Mon Feb 13 03:58:37 2012
@@ -66,14 +66,15 @@ trait Receive extends Transmission {
 trait Send extends Transmission {
     
   def writeTo(channel: GatheringByteChannel): Int
-  
+
   def writeCompletely(channel: GatheringByteChannel): Int = {
-    var written = 0
+    var totalWritten = 0
     while(!complete) {
-      written = writeTo(channel)
+      val written = writeTo(channel)
       trace(written + " bytes written.")
+      totalWritten += written
     }
-    written
+    totalWritten
   }
     
 }
@@ -99,9 +100,9 @@ abstract class MultiSend[S <: Send](val 
     if (current == Nil) {
       if (totalWritten != expectedBytesToWrite)
         error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
-      return true
+      true
+    } else {
+      false
     }
-    else
-      return false
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Feb 13 03:58:37 2012
@@ -17,17 +17,17 @@
 
 package kafka.server
 
-import org.apache.log4j.Logger
-import kafka.log._
-import kafka.network._
-import kafka.message._
+import java.io.IOException
+import java.lang.IllegalStateException
+import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.common.ErrorMapping
-import java.io.IOException
+import kafka.log._
+import kafka.message._
+import kafka.network._
 import kafka.utils.{SystemTime, Logging}
-import collection.mutable.ListBuffer
-import kafka.admin.{CreateTopicCommand, AdminUtils}
-import java.lang.IllegalStateException
+import org.apache.log4j.Logger
+import scala.collection.mutable.ListBuffer
 
 /**
  * Logic to handle the various Kafka requests
@@ -39,13 +39,12 @@ class KafkaApis(val logManager: LogManag
   def handle(receive: Receive): Option[Send] = { 
     val apiId = receive.buffer.getShort() 
     apiId match {
-        case RequestKeys.Produce => handleProducerRequest(receive)
-        case RequestKeys.Fetch => handleFetchRequest(receive)
-        case RequestKeys.MultiFetch => handleMultiFetchRequest(receive)
-        case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
-        case RequestKeys.Offsets => handleOffsetRequest(receive)
-        case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
-        case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
+      case RequestKeys.Produce => handleProducerRequest(receive)
+      case RequestKeys.Fetch => handleFetchRequest(receive)
+      case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
+      case RequestKeys.Offsets => handleOffsetRequest(receive)
+      case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
+      case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
     }
   }
 
@@ -92,34 +91,37 @@ class KafkaApis(val logManager: LogManag
     val fetchRequest = FetchRequest.readFrom(request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Fetch request " + fetchRequest.toString)
-    Some(readMessageSet(fetchRequest))
-  }
-  
-  def handleMultiFetchRequest(request: Receive): Option[Send] = {
-    val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Multifetch request")
-    multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
-    var responses = multiFetchRequest.fetches.map(fetch =>
-        readMessageSet(fetch)).toList
-    
-    Some(new MultiMessageSetSend(responses))
+
+    val fetchedData = new ListBuffer[TopicData]()
+    var error: Int = ErrorMapping.NoError
+
+    for(offsetDetail <- fetchRequest.offsetInfo) {
+      val info = new ListBuffer[PartitionData]()
+      val topic = offsetDetail.topic
+      val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
+      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
+        val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
+          case Left(err) => error = err; new PartitionData(partition, err, offset, MessageSet.Empty)
+          case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages)
+        }
+        info.append(partitionInfo)
+      }
+      fetchedData.append(new TopicData(topic, info.toArray))
+    }
+    val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, fetchedData.toArray )
+    Some(new FetchResponseSend(response, error))
   }
 
-  private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
-    var  response: MessageSetSend = null
+  private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
+    var response: Either[Int, MessageSet] = null
     try {
-      trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition)
-      val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
-      if (log != null)
-        response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
-      else
-        response = new MessageSetSend()
-    }
-    catch {
+      trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+      val log = logManager.getLog(topic, partition)
+      response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty)
+    } catch {
       case e =>
-        error("error when processing request " + fetchRequest, e)
-        response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+        error("error when processing request " + (topic, partition, offset, maxSize), e)
+        response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }
     response
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Mon Feb 13 03:58:37 2012
@@ -35,9 +35,9 @@ class KafkaRequestHandler(val requestCha
         case Some(send) => { 
           val resp = new RequestChannel.Response(processor = req.processor, 
                                                  requestKey = req.requestKey, 
-						 response = send, 
-						 start = req.start, 
-						 ellapsed = -1)
+                                                 response = send,
+                                                 start = req.start,
+                                                 elapsed = -1)
           requestChannel.sendResponse(resp)
           trace("Processor " + Thread.currentThread.getName + " sent response " + resp)
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Mon Feb 13 03:58:37 2012
@@ -19,7 +19,7 @@ package kafka.tools
 
 import java.net.URI
 import joptsimple._
-import kafka.api.FetchRequest
+import kafka.api.FetchRequestBuilder
 import kafka.utils._
 import kafka.consumer._
 
@@ -54,6 +54,11 @@ object SimpleConsumerShell extends Loggi
                            .describedAs("fetchsize")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1000000)
+    val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
+                           .withOptionalArg
+                           .describedAs("clientId")
+                           .ofType(classOf[String])
+                           .defaultsTo("SimpleConsumerShell")
     val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
                            .withOptionalArg
                            .describedAs("print offsets")
@@ -79,7 +84,8 @@ object SimpleConsumerShell extends Loggi
     val topic = options.valueOf(topicOpt)
     val partition = options.valueOf(partitionOpt).intValue
     val startingOffset = options.valueOf(offsetOpt).longValue
-    val fetchsize = options.valueOf(fetchsizeOpt).intValue
+    val fetchSize = options.valueOf(fetchsizeOpt).intValue
+    val clientId = options.valueOf(clientIdOpt).toString
     val printOffsets = if(options.has(printOffsetOpt)) true else false
     val printMessages = if(options.has(printMessageOpt)) true else false
 
@@ -87,22 +93,27 @@ object SimpleConsumerShell extends Loggi
     val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
     val thread = Utils.newThread("kafka-consumer", new Runnable() {
       def run() {
+        var reqId = 0
         var offset = startingOffset
         while(true) {
-          val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
-          val messageSets = consumer.multifetch(fetchRequest)
-          for (messages <- messageSets) {
-            debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
-            var consumed = 0
-            for(messageAndOffset <- messages) {
-              if(printMessages)
-                info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
-              offset = messageAndOffset.offset
-              if(printOffsets)
-                info("next offset = " + offset)
-              consumed += 1
-            }
+          val fetchRequest = new FetchRequestBuilder()
+            .correlationId(reqId)
+            .clientId(clientId)
+            .addFetch(topic, partition, offset, fetchSize)
+            .build()
+          val fetchResponse = consumer.fetch(fetchRequest)
+          val messageSet = fetchResponse.messageSet(topic, partition)
+          debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
+          var consumed = 0
+          for(messageAndOffset <- messageSet) {
+            if(printMessages)
+              info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+            offset = messageAndOffset.offset
+            if(printOffsets)
+              info("next offset = " + offset)
+            consumed += 1
           }
+          reqId += 1
         }
       }
     }, false);

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Mon Feb 13 03:58:37 2012
@@ -17,15 +17,17 @@
 
 package kafka.integration
 
-import kafka.server.KafkaConfig
-import org.scalatest.junit.JUnit3Suite
-import org.apache.log4j.Logger
+import junit.framework.Assert._
 import java.util.Properties
+
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
 import kafka.consumer.SimpleConsumer
-import kafka.api.{OffsetRequest, FetchRequest}
-import junit.framework.Assert._
+import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
+import org.apache.log4j.Logger
+import org.scalatest.junit.JUnit3Suite
+
 class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val topic = "MagicByte0"
@@ -62,9 +64,10 @@ class BackwardsCompatibilityTest extends
     var messageCount: Int = 0
 
     while(fetchOffset < lastOffset(0)) {
-      val fetched = simpleConsumer.fetch(new FetchRequest(topic, 0, fetchOffset, 10000))
-      fetched.foreach(m => fetchOffset = m.offset)
-      messageCount += fetched.size
+      val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build())
+      val fetchedMessages = fetched.messageSet(topic, 0)
+      fetchedMessages.foreach(m => fetchOffset = m.offset)
+      messageCount += fetchedMessages.size
     }
     assertEquals(100, messageCount)
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Feb 13 03:58:37 2012
@@ -17,15 +17,15 @@
 
 package kafka.integration
 
-import scala.collection._
+import kafka.api.{FetchRequestBuilder, ProducerRequest}
 import kafka.common.OffsetOutOfRangeException
-import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
+import kafka.utils.{TestUtils, Utils}
+import kafka.zk.ZooKeeperTestHarness
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, Utils}
+import scala.collection._
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -65,54 +65,60 @@ class LazyInitProducerTest extends JUnit
                                         new Message("hello".getBytes()), new Message("there".getBytes()))
     producer.send(topic, sent)
     sent.getBuffer.rewind
-    var fetched: ByteBufferMessageSet = null
-    while(fetched == null || fetched.validBytes == 0)
-      fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    TestUtils.checkEquals(sent.iterator, fetched.iterator)
+
+    var fetchedMessage: ByteBufferMessageSet = null
+    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+      val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      fetchedMessage = fetched.messageSet(topic, 0)
+    }
+    TestUtils.checkEquals(sent.iterator, fetchedMessage.iterator)
 
     // send an invalid offset
     try {
-      val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
-      fetchedWithError.iterator
+      val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
+      fetchedWithError.messageSet(topic, 0).iterator
       fail("Expected an OffsetOutOfRangeException exception to be thrown")
-    }
-    catch {
+    } catch {
       case e: OffsetOutOfRangeException => 
     }
   }
 
   def testProduceAndMultiFetch() {
-    // send some messages
-    val topics = List("test1", "test2", "test3");
+    // send some messages, with non-ordered topics
+    val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, offset) <- topicOffsets) {
         val set = new ByteBufferMessageSet(NoCompressionCodec,
                                            new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-        messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches += new FetchRequest(topic, 0, 0, 10000)
+        messages += topic -> set
+        builder.addFetch(topic, offset, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(200)
-      val response = consumer.multifetch(fetches: _*)
-      for((topic, resp) <- topics.zip(response.toList))
-        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, offset) <- topicOffsets) {
+        val fetched = response.messageSet(topic, offset)
+        TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+      }
     }
 
     {
       // send some invalid offsets
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, 0, -1, 10000)
-
-      val responses = consumer.multifetch(fetches: _*)
-      for(resp <- responses) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, offset) <- topicOffsets )
+        builder.addFetch(topic, offset, -1, 10000)
+
+      val request = builder.build()
+      val responses = consumer.fetch(request)
+      for( (topic, offset) <- topicOffsets ) {
         try {
-          resp.iterator
+          responses.messageSet(topic, offset).iterator
           fail("Expected an OffsetOutOfRangeException exception to be thrown")
         } catch {
           case e: OffsetOutOfRangeException => 
@@ -125,14 +131,14 @@ class LazyInitProducerTest extends JUnit
     // send some messages
     val topics = List("test1", "test2", "test3");
     val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    val builder = new FetchRequestBuilder()
     var produceList: List[ProducerRequest] = Nil
     for(topic <- topics) {
       val set = new ByteBufferMessageSet(NoCompressionCodec,
                                          new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
       messages += topic -> set
       produceList ::= new ProducerRequest(topic, 0, set)
-      fetches += new FetchRequest(topic, 0, 0, 10000)
+      builder.addFetch(topic, 0, 0, 10000)
     }
     producer.multiSend(produceList.toArray)
 
@@ -141,23 +147,26 @@ class LazyInitProducerTest extends JUnit
 
     // wait a bit for produced message to be available
     Thread.sleep(200)
-    val response = consumer.multifetch(fetches: _*)
-    for((topic, resp) <- topics.zip(response.toList))
-      TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for(topic <- topics) {
+      val fetched = response.messageSet(topic, 0)
+      TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+    }
   }
 
   def testMultiProduceResend() {
     // send some messages
     val topics = List("test1", "test2", "test3");
     val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    val builder = new FetchRequestBuilder()
     var produceList: List[ProducerRequest] = Nil
     for(topic <- topics) {
       val set = new ByteBufferMessageSet(NoCompressionCodec,
                                          new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
       messages += topic -> set
       produceList ::= new ProducerRequest(topic, 0, set)
-      fetches += new FetchRequest(topic, 0, 0, 10000)
+      builder.addFetch(topic, 0, 0, 10000)
     }
     producer.multiSend(produceList.toArray)
 
@@ -169,11 +178,13 @@ class LazyInitProducerTest extends JUnit
 
     // wait a bit for produced message to be available
     Thread.sleep(750)
-    val response = consumer.multifetch(fetches: _*)
-    for((topic, resp) <- topics.zip(response.toList))
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for(topic <- topics) {
+      val topicMessages = response.messageSet(topic, 0)
       TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
                                                       messages(topic).map(m => m.message).iterator),
-                            resp.map(m => m.message).iterator)
-//      TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
+                            topicMessages.iterator.map(_.message))
+    }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Mon Feb 13 03:58:37 2012
@@ -20,16 +20,14 @@ package kafka.log
 import kafka.server.KafkaConfig
 import java.io.File
 import java.nio.ByteBuffer
-import kafka.utils.Utils
-import kafka.api.FetchRequest
+import kafka.api.FetchRequestBuilder
 import kafka.common.InvalidMessageSizeException
-import kafka.utils.TestUtils
 import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
+import kafka.integration.{KafkaServerTestHarness, ProducerConsumerTestHarness}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
-import kafka.integration.ProducerConsumerTestHarness
-import kafka.integration.KafkaServerTestHarness
 import org.apache.log4j.{Logger, Level}
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 
 class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
   val port = TestUtils.choosePort
@@ -65,23 +63,21 @@ class LogCorruptionTest extends JUnit3Su
 
     Thread.sleep(500)
     // test SimpleConsumer
-    val messageSet = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
+    val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build())
     try {
-      for (msg <- messageSet)
+      for (msg <- response.messageSet(topic, partition))
         fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
       fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
-    }
-    catch {
+    } catch {
       case e: InvalidMessageSizeException => "This is good"
     }
 
-    val messageSet2 = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
+    val response2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build())
     try {
-      for (msg <- messageSet2)
+      for (msg <- response2.messageSet(topic, partition))
         fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
       fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
-    }
-    catch {
+    } catch {
       case e: InvalidMessageSizeException => println("This is good")
     }
 
@@ -95,8 +91,7 @@ class LogCorruptionTest extends JUnit3Su
       for (message <- messageStreams(0))
         fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
       fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
-    }
-    catch {
+    } catch {
       case e: InvalidMessageSizeException => "This is good"
       case e: Exception => "This is not bad too !"
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Feb 13 03:58:37 2012
@@ -18,18 +18,19 @@
 package kafka.integration
 
 import scala.collection._
-import junit.framework.Assert._
-import kafka.api.{ProducerRequest, FetchRequest}
-import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
+import java.io.File
 import java.util.Properties
+import junit.framework.Assert._
+import kafka.common.{ErrorMapping, OffsetOutOfRangeException, InvalidPartitionException}
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
-import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
-import java.io.File
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import kafka.utils.TestUtils
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import java.nio.ByteBuffer
+import kafka.api.{FetchRequest, FetchRequestBuilder, ProducerRequest}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -39,11 +40,28 @@ class PrimitiveApiTest extends JUnit3Sui
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
-                 override val flushInterval = 1
-               }
+    override val flushInterval = 1
+  }
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
+  def testFetchRequestCanProperlySerialize() {
+    val request = new FetchRequestBuilder()
+      .correlationId(100)
+      .clientId("test-client")
+      .maxWait(10001)
+      .minBytes(4444)
+      .addFetch("topic1", 0, 0, 10000)
+      .addFetch("topic2", 1, 1024, 9999)
+      .addFetch("topic1", 1, 256, 444)
+      .build()
+    val serializedBuffer = ByteBuffer.allocate(request.sizeInBytes)
+    request.writeTo(serializedBuffer)
+    serializedBuffer.rewind()
+    val deserializedRequest = FetchRequest.readFrom(serializedBuffer)
+    assertEquals(request, deserializedRequest)
+  }
+  
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
     val props = new Properties()
@@ -55,10 +73,18 @@ class PrimitiveApiTest extends JUnit3Sui
     stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
     Thread.sleep(200)
 
-    var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    assertTrue(fetched.iterator.hasNext)
+    val request = new FetchRequestBuilder()
+      .correlationId(100)
+      .clientId("test-client")
+      .addFetch(topic, 0, 0, 10000)
+      .build()
+    val fetched = consumer.fetch(request)
+    assertEquals("Returned correlationId doesn't match that in request.", 100, fetched.correlationId)
 
-    val fetchedMessageAndOffset = fetched.iterator.next
+    val messageSet = fetched.messageSet(topic, 0)
+    assertTrue(messageSet.iterator.hasNext)
+
+    val fetchedMessageAndOffset = messageSet.head
     val stringDecoder = new StringDecoder
     val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
     assertEquals("test-message", fetchedStringMessage)
@@ -76,10 +102,11 @@ class PrimitiveApiTest extends JUnit3Sui
     stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
     Thread.sleep(200)
 
-    var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    assertTrue(fetched.iterator.hasNext)
+    var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+    val messageSet = fetched.messageSet(topic, 0)
+    assertTrue(messageSet.iterator.hasNext)
 
-    val fetchedMessageAndOffset = fetched.iterator.next
+    val fetchedMessageAndOffset = messageSet.head
     val stringDecoder = new StringDecoder
     val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
     assertEquals("test-message", fetchedStringMessage)
@@ -87,24 +114,27 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testProduceAndMultiFetch() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics) {
         val set = new ByteBufferMessageSet(NoCompressionCodec,
                                            new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
         messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches += new FetchRequest(topic, 0, 0, 10000)
+        builder.addFetch(topic, partition, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(700)
-      val response = consumer.multifetch(fetches: _*)
-      for((topic, resp) <- topics.zip(response.toList))
-        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        val fetched = response.messageSet(topic, partition)
+        TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+      }
     }
 
     // temporarily set request handler logger to a higher level
@@ -112,34 +142,34 @@ class PrimitiveApiTest extends JUnit3Sui
 
     {
       // send some invalid offsets
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, 0, -1, 10000)
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics)
+        builder.addFetch(topic, partition, -1, 10000)
 
       try {
-        val responses = consumer.multifetch(fetches: _*)
-        for(resp <- responses)
-          resp.iterator
-        fail("expect exception")
-      }
-      catch {
+        val request = builder.build()
+        val response = consumer.fetch(request)
+        for( (topic, partition) <- topics)
+          response.messageSet(topic, partition).iterator
+        fail("Expected exception when fetching message with invalid offset")
+      } catch {
         case e: OffsetOutOfRangeException => "this is good"
       }
     }    
 
     {
       // send some invalid partitions
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, -1, 0, 10000)
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics)
+        builder.addFetch(topic, -1, 0, 10000)
 
       try {
-        val responses = consumer.multifetch(fetches: _*)
-        for(resp <- responses)
-          resp.iterator
-        fail("expect exception")
-      }
-      catch {
+        val request = builder.build()
+        val response = consumer.fetch(request)
+        for( (topic, partition) <- topics)
+          response.messageSet(topic, -1).iterator
+        fail("Expected exception when fetching message with invalid partition")
+      } catch {
         case e: InvalidPartitionException => "this is good"
       }
     }
@@ -150,24 +180,27 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testProduceAndMultiFetchWithCompression() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics) {
         val set = new ByteBufferMessageSet(DefaultCompressionCodec,
                                            new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
         messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches += new FetchRequest(topic, 0, 0, 10000)
+        builder.addFetch(topic, partition, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(200)
-      val response = consumer.multifetch(fetches: _*)
-      for((topic, resp) <- topics.zip(response.toList))
-        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        val fetched = response.messageSet(topic, partition)
+        TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+      }
     }
 
     // temporarily set request handler logger to a higher level
@@ -175,34 +208,34 @@ class PrimitiveApiTest extends JUnit3Sui
 
     {
       // send some invalid offsets
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, 0, -1, 10000)
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics)
+        builder.addFetch(topic, partition, -1, 10000)
 
       try {
-        val responses = consumer.multifetch(fetches: _*)
-        for(resp <- responses)
-          resp.iterator
-        fail("expect exception")
-      }
-      catch {
+        val request = builder.build()
+        val response = consumer.fetch(request)
+        for( (topic, partition) <- topics)
+          response.messageSet(topic, partition).iterator
+        fail("Expected exception when fetching message with invalid offset")
+      } catch {
         case e: OffsetOutOfRangeException => "this is good"
       }
     }
 
     {
       // send some invalid partitions
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, -1, 0, 10000)
+      val builder = new FetchRequestBuilder()
+      for( (topic, _) <- topics)
+        builder.addFetch(topic, -1, 0, 10000)
 
       try {
-        val responses = consumer.multifetch(fetches: _*)
-        for(resp <- responses)
-          resp.iterator
-        fail("expect exception")
-      }
-      catch {
+        val request = builder.build()
+        val response = consumer.fetch(request)
+        for( (topic, _) <- topics)
+          response.messageSet(topic, -1).iterator
+        fail("Expected exception when fetching message with invalid partition")
+      } catch {
         case e: InvalidPartitionException => "this is good"
       }
     }
@@ -213,16 +246,16 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testMultiProduce() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    val builder = new FetchRequestBuilder()
     var produceList: List[ProducerRequest] = Nil
-    for(topic <- topics) {
+    for( (topic, partition) <- topics) {
       val set = new ByteBufferMessageSet(NoCompressionCodec,
                                          new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
       messages += topic -> set
       produceList ::= new ProducerRequest(topic, 0, set)
-      fetches += new FetchRequest(topic, 0, 0, 10000)
+      builder.addFetch(topic, partition, 0, 10000)
     }
     producer.multiSend(produceList.toArray)
 
@@ -231,23 +264,26 @@ class PrimitiveApiTest extends JUnit3Sui
       
     // wait a bit for produced message to be available
     Thread.sleep(200)
-    val response = consumer.multifetch(fetches: _*)
-    for((topic, resp) <- topics.zip(response.toList))
-      TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for( (topic, partition) <- topics) {
+      val fetched = response.messageSet(topic, partition)
+      TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+    }
   }
 
   def testMultiProduceWithCompression() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    val builder = new FetchRequestBuilder()
     var produceList: List[ProducerRequest] = Nil
-    for(topic <- topics) {
+    for( (topic, partition) <- topics) {
       val set = new ByteBufferMessageSet(DefaultCompressionCodec,
                                          new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
       messages += topic -> set
       produceList ::= new ProducerRequest(topic, 0, set)
-      fetches += new FetchRequest(topic, 0, 0, 10000)
+      builder.addFetch(topic, partition, 0, 10000)
     }
     producer.multiSend(produceList.toArray)
 
@@ -256,15 +292,18 @@ class PrimitiveApiTest extends JUnit3Sui
 
     // wait a bit for produced message to be available
     Thread.sleep(200)
-    val response = consumer.multifetch(fetches: _*)
-    for((topic, resp) <- topics.zip(response.toList))
-      TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for( (topic, partition) <- topics) {
+      val fetched = response.messageSet(topic, 0)
+      TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+    }
   }
 
   def testConsumerNotExistTopic() {
     val newTopic = "new-topic"
-    val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator
-    assertTrue(messageSetIter.hasNext == false)
+    val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
+    assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
     val logFile = new File(config.logDir, newTopic + "-0")
     assertTrue(!logFile.exists)
   }



Mime
View raw message