kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [15/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.Request
+
+object MultiProducerRequest {
+  def readFrom(buffer: ByteBuffer): MultiProducerRequest = {
+    val count = buffer.getShort
+    val produces = new Array[ProducerRequest](count)
+    for(i <- 0 until produces.length)
+      produces(i) = ProducerRequest.readFrom(buffer)
+    new MultiProducerRequest(produces)
+  }
+}
+
+class MultiProducerRequest(val produces: Array[ProducerRequest]) extends Request(RequestKeys.MultiProduce) {
+  def writeTo(buffer: ByteBuffer) {
+    if(produces.length > Short.MaxValue)
+      throw new IllegalArgumentException("Number of requests in MultiProducer exceeds " + Short.MaxValue + ".")    
+    buffer.putShort(produces.length.toShort)
+    for(produce <- produces)
+      produce.writeTo(buffer)
+  }
+
+  def sizeInBytes: Int = {
+    var size = 2
+    for(produce <- produces)
+      size += produce.sizeInBytes
+    size
+  }
+
+  override def toString(): String = {
+    val buffer = new StringBuffer
+    for(produce <- produces) {
+      buffer.append(produce.toString)
+      buffer.append(",")
+    }
+    buffer.toString
+  }  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.{nonthreadsafe, Utils}
+import kafka.network.{Send, Request}
+import java.nio.channels.WritableByteChannel
+import kafka.common.ErrorMapping
+
+object OffsetRequest {
+  val SmallestTimeString = "smallest"
+  val LargestTimeString = "largest"
+  val LatestTime = -1L
+  val EarliestTime = -2L
+
+  def readFrom(buffer: ByteBuffer): OffsetRequest = {
+    val topic = Utils.readShortString(buffer, "UTF-8")
+    val partition = buffer.getInt()
+    val offset = buffer.getLong
+    val maxNumOffsets = buffer.getInt
+    new OffsetRequest(topic, partition, offset, maxNumOffsets)
+  }
+
+  def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = {
+    val size = 4 + 8 * offsets.length
+    val buffer = ByteBuffer.allocate(size)
+    buffer.putInt(offsets.length)
+    for (i <- 0 until offsets.length)
+      buffer.putLong(offsets(i))
+    buffer.rewind
+    buffer
+  }
+
+  def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = {
+    val size = buffer.getInt
+    val offsets = new Array[Long](size)
+    for (i <- 0 until offsets.length)
+      offsets(i) = buffer.getLong
+    offsets
+  }
+}
+
+class OffsetRequest(val topic: String,
+                    val partition: Int,
+                    val time: Long,
+                    val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
+
+  def writeTo(buffer: ByteBuffer) {
+    Utils.writeShortString(buffer, topic, "UTF-8")
+    buffer.putInt(partition)
+    buffer.putLong(time)
+    buffer.putInt(maxNumOffsets)
+  }
+
+  def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
+
+  override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
+          ", maxNumOffsets:" + maxNumOffsets + ")"
+}
+
+@nonthreadsafe
+private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
+  private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
+  private val header = ByteBuffer.allocate(6)
+  header.putInt(size.asInstanceOf[Int] + 2)
+  header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
+  header.rewind()
+  private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
+
+  var complete: Boolean = false
+
+  def writeTo(channel: WritableByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    if(header.hasRemaining)
+      written += channel.write(header)
+    if(!header.hasRemaining && contentBuffer.hasRemaining)
+      written += channel.write(contentBuffer)
+
+    if(!contentBuffer.hasRemaining)
+      complete = true
+    written
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.message._
+import kafka.network._
+import kafka.utils._
+
+object ProducerRequest {
+  val RandomPartition = -1
+  
+  def readFrom(buffer: ByteBuffer): ProducerRequest = {
+    val topic = Utils.readShortString(buffer, "UTF-8")
+    val partition = buffer.getInt
+    val messageSetSize = buffer.getInt
+    val messageSetBuffer = buffer.slice()
+    messageSetBuffer.limit(messageSetSize)
+    buffer.position(buffer.position + messageSetSize)
+    new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer))
+  }
+}
+
+class ProducerRequest(val topic: String,
+                      val partition: Int,
+                      val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+
+  def writeTo(buffer: ByteBuffer) {
+    Utils.writeShortString(buffer, topic, "UTF-8")
+    buffer.putInt(partition)
+    buffer.putInt(messages.serialized.limit)
+    buffer.put(messages.serialized)
+    messages.serialized.rewind
+  }
+  
+  def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int]
+
+  def getTranslatedPartition(randomSelector: String => Int): Int = {
+    if (partition == ProducerRequest.RandomPartition)
+      return randomSelector(topic)
+    else 
+      return partition
+  }
+
+  override def toString: String = {
+    val builder = new StringBuilder()
+    builder.append("ProducerRequest(")
+    builder.append(topic + ",")
+    builder.append(partition + ",")
+    builder.append(messages.sizeInBytes)
+    builder.append(")")
+    builder.toString
+  }
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: ProducerRequest =>
+        (that canEqual this) && topic == that.topic && partition == that.partition &&
+                messages.equals(that.messages) 
+      case _ => false
+    }
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
+
+  override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+object RequestKeys {
+  val Produce: Short = 0
+  val Fetch: Short = 1
+  val MultiFetch: Short = 2
+  val MultiProduce: Short = 3
+  val Offsets: Short = 4
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import java.util.Arrays
+import kafka.utils._
+import java.net.InetAddress
+import kafka.server.KafkaConfig
+import util.parsing.json.JSON
+
+/**
+ * A Kafka broker
+ */
+private[kafka] object Broker {
+  def createBroker(id: Int, brokerInfoString: String): Broker = {
+    val brokerInfo = brokerInfoString.split(":")
+    new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
+  }
+}
+
+private[kafka] class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+  
+  override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
+
+  def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
+  
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case n: Broker => id == n.id && host == n.host && port == n.port
+      case _ => false
+    }
+  }
+  
+  override def hashCode(): Int = Utils.hashcode(id, host, port)
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import kafka.utils._
+import scala.collection._
+
+/**
+ * The set of active brokers in the cluster
+ */
+private[kafka] class Cluster {
+  
+  private val brokers = new mutable.HashMap[Int, Broker]
+  
+  def this(brokerList: Iterable[Broker]) {
+    this()
+	  for(broker <- brokerList)
+      brokers.put(broker.id, broker)
+  }
+
+  def getBroker(id: Int) = brokers.get(id).get
+  
+  def add(broker: Broker) = brokers.put(broker.id, broker)
+  
+  def remove(id: Int) = brokers.remove(id)
+  
+  def size = brokers.size
+  
+  override def toString(): String = 
+    "Cluster(" + brokers.values.mkString(", ") + ")"  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+object Partition {
+  def parse(s: String): Partition = {
+    val pieces = s.split("-")
+    if(pieces.length != 2)
+      throw new IllegalArgumentException("Expected name in the form x-y.")
+    new Partition(pieces(0).toInt, pieces(1).toInt)
+  }
+}
+
+class Partition(val brokerId: Int, val partId: Int) extends Ordered[Partition] {
+
+  def this(name: String) = {
+    this(1, 1)
+  }
+  
+  def name = brokerId + "-" + partId
+  
+  override def toString(): String = name
+
+  def compare(that: Partition) =
+    if (this.brokerId == that.brokerId)
+      this.partId - that.partId
+    else
+      this.brokerId - that.brokerId
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: Partition =>
+        (that canEqual this) && brokerId == that.brokerId && partId == that.partId
+      case _ => false
+    }
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[Partition]
+
+  override def hashCode: Int = 31 * (17 + brokerId) + partId
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import kafka.consumer._
+import kafka.message.InvalidMessageException
+import java.nio.ByteBuffer
+import java.lang.Throwable
+
+/**
+ * A bi-directional mapping between error codes and exceptions x  
+ */
+object ErrorMapping {
+  val EmptyByteBuffer = ByteBuffer.allocate(0)
+
+  val UnknownCode = -1
+  val NoError = 0
+  val OffsetOutOfRangeCode = 1
+  val InvalidMessageCode = 2
+  val WrongPartitionCode = 3
+  val InvalidFetchSizeCode = 4
+
+  private val exceptionToCode = 
+    Map[Class[Throwable], Int](
+      classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
+      classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
+      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
+      classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode
+    ).withDefaultValue(UnknownCode)
+  
+  /* invert the mapping */
+  private val codeToException = 
+    (Map[Int, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
+  
+  def codeFor(exception: Class[Throwable]): Int = exceptionToCode(exception)
+  
+  def maybeThrowException(code: Int) =
+    if(code != 0)
+      throw codeToException(code).newInstance()
+}
+
+class InvalidTopicException(message: String) extends RuntimeException(message) {
+  def this() = this(null)  
+}
+
+class MessageSizeTooLargeException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+/**
+ * Indicates that the given config parameter has invalid value
+ */
+class InvalidConfigException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,9 @@
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class InvalidMessageSizeException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates that the partition id is not between 0 and numPartitions-1
+ */
+class InvalidPartitionException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,9 @@
+package kafka.common
+
+/**
+ * Thrown when a request is made for broker but no brokers with that topic
+ * exist.
+ */
+class NoBrokersForPartitionException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class OffsetOutOfRangeException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates a producer pool initialization problem
+*/
+class UnavailableProducerException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class UnknownCodecException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * If we don't know what else it is, call it this
+ */
+class UnknownException extends RuntimeException

Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class UnknownMagicByteException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection.mutable._
+import scala.collection.JavaConversions._
+import org.I0Itec.zkclient._
+import joptsimple._
+import org.apache.log4j.Logger
+import java.util.Arrays.asList
+import java.util.Properties
+import java.util.Random
+import java.io.PrintStream
+import kafka.message._
+import kafka.utils.Utils
+import kafka.utils.ZkUtils
+import kafka.utils.StringSerializer
+
+/**
+ * Consumer that dumps messages out to standard out.
+ *
+ */
+object ConsoleConsumer {
+  
+  private val logger = Logger.getLogger(getClass())
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + 
+                                      "Multiple URLS can be given to allow fail-over.")
+                           .withRequiredArg
+                           .describedAs("urls")
+                           .ofType(classOf[String])
+    val groupIdOpt = parser.accepts("group", "The group id to consume on.")
+                           .withRequiredArg
+                           .describedAs("gid")
+                           .defaultsTo("console-consumer-" + new Random().nextInt(100000))   
+                           .ofType(classOf[String])
+    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
+                           .withRequiredArg
+                           .describedAs("size")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024 * 1024)   
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+                           .withRequiredArg
+                           .describedAs("size")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(2 * 1024 * 1024)
+    val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
+                           .withRequiredArg
+                           .describedAs("class")
+                           .ofType(classOf[String])
+                           .defaultsTo(classOf[NewlineMessageFormatter].getName)
+    val messageFormatterArgOpt = parser.accepts("property")
+                           .withRequiredArg
+                           .describedAs("prop")
+                           .ofType(classOf[String])
+    val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
+    		"start with the earliest message present in the log rather than the latest message.")
+    val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
+                           .withRequiredArg
+                           .describedAs("ms")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(10*1000)
+    val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
+                           .withRequiredArg
+                           .describedAs("num_messages")
+                           .ofType(classOf[java.lang.Integer])
+    val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
+    		"skip it instead of halt.")
+
+    val options: OptionSet = tryParse(parser, args)
+    checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+    
+    val props = new Properties()
+    props.put("groupid", options.valueOf(groupIdOpt))
+    props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
+    props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+    props.put("auto.commit", "true")
+    props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
+    props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+    props.put("zk.connect", options.valueOf(zkConnectOpt))
+    val config = new ConsumerConfig(props)
+    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
+    
+    val topic = options.valueOf(topicIdOpt)
+    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
+    val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+    
+    val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
+
+    val connector = Consumer.create(config)
+    
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() {
+        connector.shutdown()
+        // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
+        if(!options.has(groupIdOpt))  
+          tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
+      }
+    })
+    
+    var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
+    val iter =
+      if(maxMessages >= 0)
+        stream.slice(0, maxMessages)
+      else
+        stream
+
+    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    formatter.init(formatterArgs)
+
+    try {
+      for(message <- iter) {
+        try {
+          formatter.writeTo(message, System.out)
+        } catch {
+          case e =>
+            if (skipMessageOnError)
+              logger.error("error processing message, skipping and resume consumption: " + e)
+            else
+              throw e
+        }
+      }
+    } catch {
+      case e => logger.error("error processing message, stop consuming: " + e)
+    }
+      
+    System.out.flush()
+    formatter.close()
+    connector.shutdown()
+  }
+  
+  def tryParse(parser: OptionParser, args: Array[String]) = {
+    try {
+      parser.parse(args : _*)
+    } catch {
+      case e: OptionException => {
+        Utils.croak(e.getMessage)
+        null
+      }
+    }
+  }
+  
+  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+    for(arg <- required) {
+    	if(!options.has(arg)) {
+        logger.error("Missing required argument \"" + arg + "\"")
+    		parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+  }
+  
+  def tryParseFormatterArgs(args: Iterable[String]): Properties = {
+    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+    if(!splits.forall(_.length == 2)) {
+      System.err.println("Invalid parser arguments: " + args.mkString(" "))
+      System.exit(1)
+    }
+    val props = new Properties
+    for(a <- splits)
+      props.put(a(0), a(1))
+    props
+  }
+  
+  trait MessageFormatter {
+    def writeTo(message: Message, output: PrintStream)
+    def init(props: Properties) {}
+    def close() {}
+  }
+  
+  class NewlineMessageFormatter extends MessageFormatter {
+    def writeTo(message: Message, output: PrintStream) {
+      val payload = message.payload
+      output.write(payload.array, payload.arrayOffset, payload.limit)
+      output.write('\n')
+    }
+  }
+  
+  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+    try {
+      val dir = "/consumers/" + groupId
+      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _ => // swallow
+    }
+  }
+   
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.Properties
+import kafka.utils.{ZKConfig, Utils}
+import kafka.api.OffsetRequest
+
+object ConsumerConfig {
+  val SocketTimeout = 30 * 1000
+  val SocketBufferSize = 64*1024
+  val FetchSize = 300 * 1024
+  val MaxFetchSize = 10*FetchSize
+  val BackoffIncrementMs = 1000
+  val AutoCommit = true
+  val AutoCommitInterval = 10 * 1000
+  val MaxQueuedChunks = 100
+  val AutoOffsetReset = OffsetRequest.SmallestTimeString
+  val ConsumerTimeoutMs = -1
+  val EmbeddedConsumerTopics = ""
+}
+
+class ConsumerConfig(props: Properties) extends ZKConfig(props) {
+  import ConsumerConfig._
+
+  /** a string that uniquely identifies a set of consumers within the same consumer group */
+  val groupId = Utils.getString(props, "groupid")
+
+  /** consumer id: generated automatically if not set.
+   *  Set this explicitly for only testing purpose. */
+  val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */
+    if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None
+
+  /** the socket timeout for network requests */
+  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
+  
+  /** the socket receive buffer for network requests */
+  val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
+  
+  /** the number of byes of messages to attempt to fetch */
+  val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
+  
+  /** the maximum allowable fetch size for a very large message */
+  val maxFetchSize: Int = fetchSize * 10
+  
+  /** to avoid repeatedly polling a broker node which has no new data
+      we will backoff every time we get an empty set from the broker*/
+  val backoffIncrementMs: Long = Utils.getInt(props, "backoff.increment.ms", BackoffIncrementMs)
+  
+  /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
+  val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
+  
+  /** the frequency in ms that the consumer offsets are committed to zookeeper */
+  val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
+
+  /** max number of messages buffered for consumption */
+  val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
+
+  /* what to do if an offset is out of range.
+     smallest : automatically reset the offset to the smallest offset
+     largest : automatically reset the offset to the largest offset
+     anything else: throw exception to the consumer */
+  val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
+
+  /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
+  val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
+
+  /* embed a consumer in the broker. e.g., topic1:1,topic2:1 */
+  val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics",
+    EmbeddedConsumerTopics))
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection._
+import kafka.utils.Utils
+import org.apache.log4j.Logger
+
+/**
+ *  Main interface for consumer
+ */
+trait ConsumerConnector {
+  /**
+   *  Create a list of MessageStreams for each topic.
+   *
+   *  @param topicCountMap  a map of (topic, #streams) pair
+   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
+   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
+   */
+  def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]]
+
+  /**
+   *  Commit the offsets of all broker partitions connected by this connector.
+   */
+  def commitOffsets
+  
+  /**
+   *  Shut down the connector
+   */
+  def shutdown()
+}
+
+object Consumer {
+  private val logger = Logger.getLogger(getClass())  
+  private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats"
+
+  /**
+   *  Create a ConsumerConnector
+   *
+   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
+   *                 connection string zk.connect.
+   */
+  def create(config: ConsumerConfig): ConsumerConnector = {
+    val consumerConnect = new ZookeeperConsumerConnector(config)
+    Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect, consumerStatsMBeanName))
+    consumerConnect
+  }
+
+  /**
+   *  Create a ConsumerConnector
+   *
+   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
+   *                 connection string zk.connect.
+   */
+  def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
+    val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
+    Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName))
+    consumerConnect
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.IteratorTemplate
+import org.apache.log4j.Logger
+import java.util.concurrent.{TimeUnit, BlockingQueue}
+import kafka.cluster.Partition
+import kafka.message.{MessageAndOffset, MessageSet, Message}
+
+/**
+ * An iterator that blocks until a value can be read from the supplied queue.
+ * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
+ * 
+ */
+class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
+        extends IteratorTemplate[Message] {
+  
+  private val logger = Logger.getLogger(classOf[ConsumerIterator])
+  private var current: Iterator[MessageAndOffset] = null
+  private var currentDataChunk: FetchedDataChunk = null
+  private var currentTopicInfo: PartitionTopicInfo = null
+  private var consumedOffset: Long = -1L
+
+  override def next(): Message = {
+    val message = super.next
+    if(consumedOffset < 0)
+      throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
+    currentTopicInfo.resetConsumeOffset(consumedOffset)
+    if(logger.isTraceEnabled)
+      logger.trace("Setting consumed offset to %d".format(consumedOffset))
+    message
+  }
+
+  protected def makeNext(): Message = {
+    // if we don't have an iterator, get one
+    if(current == null || !current.hasNext) {
+      if (consumerTimeoutMs < 0)
+        currentDataChunk = channel.take
+      else {
+        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
+        if (currentDataChunk == null) {
+          throw new ConsumerTimeoutException
+        }
+      }
+      if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
+        if(logger.isDebugEnabled)
+          logger.debug("Received the shutdown command")
+    	  channel.offer(currentDataChunk)
+        return allDone
+      } else {
+        currentTopicInfo = currentDataChunk.topicInfo
+        if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+          logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+                        .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
+          currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
+        }
+        current = currentDataChunk.messages.iterator
+      }
+    }
+    val item = current.next
+    consumedOffset = item.offset
+    item.message
+  }
+  
+}
+
+class ConsumerTimeoutException() extends RuntimeException()

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.message.ByteBufferMessageSet
+
+private[consumer] class FetchedDataChunk(val messages: ByteBufferMessageSet,
+                                         val topicInfo: PartitionTopicInfo,
+                                         val fetchOffset: Long)

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection._
+import org.apache.log4j.Logger
+import kafka.cluster._
+import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.BlockingQueue
+
+/**
+ * The fetcher is a background thread that fetches data from a set of servers
+ */
+private[consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) {
+  private val logger = Logger.getLogger(getClass())
+  private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0)
+  @volatile
+  private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS
+
+  /**
+   *  shutdown all fetch threads
+   */
+  def shutdown() {
+    // shutdown the old fetcher threads, if any
+    for (fetcherThread <- fetcherThreads)
+      fetcherThread.shutdown
+    fetcherThreads = EMPTY_FETCHER_THREADS
+  }
+
+  /**
+   *  Open connections.
+   */
+  def initConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+                      queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
+    shutdown
+
+    if (topicInfos == null)
+      return
+
+    queuesTobeCleared.foreach(_.clear)
+
+    // re-arrange by broker id
+    val m = new mutable.HashMap[Int, List[PartitionTopicInfo]]
+    for(info <- topicInfos) {
+      m.get(info.brokerId) match {
+        case None => m.put(info.brokerId, List(info))
+        case Some(lst) => m.put(info.brokerId, info :: lst)
+      }
+    }
+
+    // open a new fetcher thread for each broker
+    val ids = Set() ++ topicInfos.map(_.brokerId)
+    val brokers = ids.map(cluster.getBroker(_))
+    fetcherThreads = new Array[FetcherRunnable](brokers.size)
+    var i = 0
+    for(broker <- brokers) {
+      val fetcherThread = new FetcherRunnable("FetchRunnable-" + i, zkClient, config, broker, m.get(broker.id).get)
+      fetcherThreads(i) = fetcherThread
+      fetcherThread.start
+      i +=1
+    }
+  }    
+}
+
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent.CountDownLatch
+import org.apache.log4j.Logger
+import java.nio.channels.{ClosedChannelException, ClosedByInterruptException}
+import kafka.common.{OffsetOutOfRangeException, ErrorMapping}
+import kafka.cluster.{Partition, Broker}
+import kafka.api.{MultiFetchResponse, OffsetRequest, FetchRequest}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import java.io.IOException
+
+class FetcherRunnable(val name: String,
+                      val zkClient : ZkClient,
+                      val config: ConsumerConfig,
+                      val broker: Broker,
+                      val partitionTopicInfos: List[PartitionTopicInfo])
+  extends Thread(name) {
+  private val logger = Logger.getLogger(getClass())
+  private val shutdownLatch = new CountDownLatch(1)
+  private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs,
+    config.socketBufferSize)
+  @volatile
+  private var stopped = false
+
+  def shutdown(): Unit = {
+    stopped = true
+    interrupt
+    logger.debug("awaiting shutdown on fetcher " + name)
+    shutdownLatch.await
+    logger.debug("shutdown of fetcher " + name + " thread complete")
+  }
+
+  override def run() {
+    for (info <- partitionTopicInfos)
+      logger.info(name + " start fetching topic: " + info.topic + " part: " + info.partition.partId + " offset: "
+        + info.getFetchOffset + " from " + broker.host + ":" + broker.port)
+
+    try {
+      while (!stopped) {
+        val fetches = partitionTopicInfos.map(info =>
+          new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
+
+        if (logger.isTraceEnabled)
+          logger.trace("fetch request: " + fetches.toString)
+
+        val response = simpleConsumer.multifetch(fetches : _*)
+
+        var read = 0L
+
+        for((messages, info) <- response.zip(partitionTopicInfos)) {
+          try {
+            var done = false
+            if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
+              logger.info("offset " + info.getFetchOffset + " out of range")
+              // see if we can fix this error
+              val resetOffset = resetConsumerOffsets(info.topic, info.partition)
+              if(resetOffset >= 0) {
+                info.resetFetchOffset(resetOffset)
+                info.resetConsumeOffset(resetOffset)
+                done = true
+              }
+            }
+            if (!done)
+              read += info.enqueue(messages, info.getFetchOffset)
+          }
+          catch {
+            case e1: IOException =>
+              // something is wrong with the socket, re-throw the exception to stop the fetcher
+              throw e1
+            case e2 =>
+              if (!stopped) {
+                // this is likely a repeatable error, log it and trigger an exception in the consumer
+                logger.error("error in FetcherRunnable for " + info, e2)
+                info.enqueueError(e2, info.getFetchOffset)
+              }
+              // re-throw the exception to stop the fetcher
+              throw e2
+          }
+        }
+
+        if (logger.isTraceEnabled)
+          logger.trace("fetched bytes: " + read)
+        if(read == 0) {
+          logger.debug("backing off " + config.backoffIncrementMs + " ms")
+          Thread.sleep(config.backoffIncrementMs)
+        }
+      }
+    }
+    catch {
+      case e =>
+        if (stopped)
+          logger.info("FecherRunnable " + this + " interrupted")
+        else
+          logger.error("error in FetcherRunnable ", e)
+    }
+
+    logger.info("stopping fetcher " + name + " to host " + broker.host)
+    Utils.swallow(logger.info, simpleConsumer.close)
+    shutdownComplete()
+  }
+
+  /**
+   * Record that the thread shutdown is complete
+   */
+  private def shutdownComplete() = shutdownLatch.countDown
+
+  private def resetConsumerOffsets(topic : String,
+                                   partition: Partition) : Long = {
+    var offset : Long = 0
+    config.autoOffsetReset match {
+      case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime
+      case OffsetRequest.LargestTimeString => offset = OffsetRequest.LatestTime
+      case _ => return -1
+    }
+
+    // get mentioned offset from the broker
+    val offsets = simpleConsumer.getOffsetsBefore(topic, partition.partId, offset, 1)
+    val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+
+    // reset manually in zookeeper
+    logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
+    ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString)
+
+    offsets(0)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent.BlockingQueue
+import org.apache.log4j.Logger
+import kafka.message.Message
+
+
+/**
+ * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread
+ * that feeds messages into a blocking queue for processing.
+ */
+class KafkaMessageStream(private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
+   extends Iterable[Message] with java.lang.Iterable[Message]{
+
+  private val logger = Logger.getLogger(getClass())
+  private val iter: ConsumerIterator = new ConsumerIterator(queue, consumerTimeoutMs)
+    
+  /**
+   *  Create an iterator over messages in the stream.
+   */
+  def iterator(): ConsumerIterator = iter
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.nio.channels._
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import kafka.message._
+import kafka.cluster._
+import kafka.common.ErrorMapping
+import org.apache.log4j.Logger
+
+private[consumer] class PartitionTopicInfo(val topic: String,
+                                           val brokerId: Int,
+                                           val partition: Partition,
+                                           private val chunkQueue: BlockingQueue[FetchedDataChunk],
+                                           private val consumedOffset: AtomicLong,
+                                           private val fetchedOffset: AtomicLong,
+                                           private val fetchSize: AtomicInteger) {
+  private val logger = Logger.getLogger(getClass())
+  if (logger.isDebugEnabled) {
+    logger.debug("initial consumer offset of " + this + " is " + consumedOffset.get)
+    logger.debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
+  }
+
+  def getConsumeOffset() = consumedOffset.get
+
+  def getFetchOffset() = fetchedOffset.get
+
+  def resetConsumeOffset(newConsumeOffset: Long) = {
+    consumedOffset.set(newConsumeOffset)
+    if (logger.isDebugEnabled)
+      logger.debug("reset consume offset of " + this + " to " + newConsumeOffset)
+  }
+
+  def resetFetchOffset(newFetchOffset: Long) = {
+    fetchedOffset.set(newFetchOffset)
+    if (logger.isDebugEnabled)
+      logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
+  }
+
+  /**
+   * Enqueue a message set for processing
+   * @return the number of valid bytes
+   */
+  def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
+    val size = messages.shallowValidBytes
+    if(size > 0) {
+      // update fetched offset to the compressed data chunk size, not the decompressed message set size
+      if(logger.isTraceEnabled)
+        logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+      val newOffset = fetchedOffset.addAndGet(size)
+      if (logger.isDebugEnabled)
+        logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+      chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
+    }
+    size
+  }
+
+  /**
+   *  add an empty message with the exception to the queue so that client can see the error
+   */
+  def enqueueError(e: Throwable, fetchOffset: Long) = {
+    val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
+  }
+
+  override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
+    ": consumed offset = " + consumedOffset.get
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.net._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import org.apache.log4j.Logger
+import kafka.api._
+import kafka.common._
+import kafka.message._
+import kafka.network._
+import kafka.utils._
+
+/**
+ * A consumer of kafka messages
+ */
+@threadsafe
+class SimpleConsumer(val host: String,
+                     val port: Int,
+                     val soTimeout: Int,
+                     val bufferSize: Int) {
+  private val logger = Logger.getLogger(getClass())
+  private var channel : SocketChannel = null
+  private val lock = new Object()
+
+  private def connect(): SocketChannel = {
+    close
+    val address = new InetSocketAddress(host, port)
+
+    val channel = SocketChannel.open
+    if(logger.isDebugEnabled)
+      logger.debug("Connected to " + address + " for fetching.")
+    channel.configureBlocking(true)
+    channel.socket.setReceiveBufferSize(bufferSize)
+    channel.socket.setSoTimeout(soTimeout)
+    channel.socket.setKeepAlive(true)
+    channel.connect(address)
+    if(logger.isTraceEnabled) {
+      logger.trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize)
+      logger.trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout)
+    }
+    channel
+  }
+
+  private def close(channel: SocketChannel) = {
+    if(logger.isDebugEnabled)
+      logger.debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
+    Utils.swallow(logger.warn, channel.close())
+    Utils.swallow(logger.warn, channel.socket.close())
+  }
+
+  def close() {
+    lock synchronized {
+      if (channel != null)
+        close(channel)
+      channel = null
+    }
+  }
+
+  /**
+   *  Fetch a set of messages from a topic.
+   *
+   *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+   *  @return a set of fetched messages
+   */
+  def fetch(request: FetchRequest): ByteBufferMessageSet = {
+    lock synchronized {
+      val startTime = SystemTime.nanoseconds
+      getOrMakeConnection()
+      var response: Tuple2[Receive,Int] = null
+      try {
+        sendRequest(request)
+        response = getResponse
+      } catch {
+        case e : java.io.IOException =>
+          logger.info("fetch reconnect due to " + e)
+          // retry once
+          try {
+            channel = connect
+            sendRequest(request)
+            response = getResponse
+          }catch {
+            case ioe: java.io.IOException => channel = null; throw ioe;
+          }
+        case e => throw e
+      }
+      val endTime = SystemTime.nanoseconds
+      SimpleConsumerStats.recordFetchRequest(endTime - startTime)
+      SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
+      new ByteBufferMessageSet(response._1.buffer, request.offset, response._2)
+    }
+  }
+
+  /**
+   *  Combine multiple fetch requests in one call.
+   *
+   *  @param fetches  a sequence of fetch requests.
+   *  @return a sequence of fetch responses
+   */
+  def multifetch(fetches: FetchRequest*): MultiFetchResponse = {
+    lock synchronized {
+      val startTime = SystemTime.nanoseconds
+      getOrMakeConnection()
+      var response: Tuple2[Receive,Int] = null
+      try {
+        sendRequest(new MultiFetchRequest(fetches.toArray))
+        response = getResponse
+      } catch {
+        case e : java.io.IOException =>
+          logger.info("multifetch reconnect due to " + e)
+          // retry once
+          try {
+            channel = connect
+            sendRequest(new MultiFetchRequest(fetches.toArray))
+            response = getResponse
+          }catch {
+            case ioe: java.io.IOException => channel = null; throw ioe;
+          }
+        case e => throw e        
+      }
+      val endTime = SystemTime.nanoseconds
+      SimpleConsumerStats.recordFetchRequest(endTime - startTime)
+      SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
+
+      // error code will be set on individual messageset inside MultiFetchResponse
+      new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset))
+    }
+  }
+
+  /**
+   *  Get a list of valid offsets (up to maxSize) before the given time.
+   *  The result is a list of offsets, in descending order.
+   *
+   *  @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
+   *  @return an array of offsets
+   */
+  def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
+    lock synchronized {
+      getOrMakeConnection()
+      var response: Tuple2[Receive,Int] = null
+      try {
+        sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
+        response = getResponse
+      } catch {
+        case e : java.io.IOException =>
+          logger.info("getOffsetsBefore reconnect due to " + e)
+          // retry once
+          try {
+            channel = connect
+            sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
+            response = getResponse
+          }catch {
+            case ioe: java.io.IOException => channel = null; throw ioe;
+          }
+      }
+      OffsetRequest.deserializeOffsetArray(response._1.buffer)
+    }
+  }
+
+  private def sendRequest(request: Request) = {
+    val send = new BoundedByteBufferSend(request)
+    send.writeCompletely(channel)
+  }
+
+  private def getResponse(): Tuple2[Receive,Int] = {
+    val response = new BoundedByteBufferReceive()
+    response.readCompletely(channel)
+
+    // this has the side effect of setting the initial position of buffer correctly
+    val errorCode: Int = response.buffer.getShort
+    (response, errorCode)
+  }
+
+  private def getOrMakeConnection() {
+    if(channel == null) {
+      channel = connect()
+    }
+  }
+}
+
+trait SimpleConsumerStatsMBean {
+  def getFetchRequestsPerSecond: Double
+  def getAvgFetchRequestMs: Double
+  def getMaxFetchRequestMs: Double
+  def getNumFetchRequests: Long  
+  def getConsumerThroughput: Double
+}
+
+@threadsafe
+class SimpleConsumerStats extends SimpleConsumerStatsMBean {
+  private val fetchRequestStats = new SnapshotStats
+
+  def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
+
+  def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data)
+
+  def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond
+
+  def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0)
+
+  def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0)
+
+  def getNumFetchRequests: Long = fetchRequestStats.getNumRequests
+
+  def getConsumerThroughput: Double = fetchRequestStats.getThroughput
+}
+
+object SimpleConsumerStats {
+  private val logger = Logger.getLogger(getClass())
+  private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
+  private val stats = new SimpleConsumerStats
+  Utils.swallow(logger.warn, Utils.registerMBean(stats, simpleConsumerstatsMBeanName))
+
+  def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
+  def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data)
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection._
+import scala.util.parsing.json.JSON
+import org.apache.log4j.Logger
+
+private[consumer] object TopicCount {
+  private val logger = Logger.getLogger(getClass())
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+
+  def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = {
+    var topMap : Map[String,Int] = null
+    try {
+      JSON.parseFull(jsonString) match {
+        case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
+        case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
+      }
+    }
+    catch {
+      case e =>
+        logger.error("error parsing consumer json string " + jsonString, e)
+        throw e
+    }
+
+    new TopicCount(consumerIdSting, topMap)
+  }
+
+}
+
+private[consumer] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
+
+  def getConsumerThreadIdsPerTopic()
+    : Map[String, Set[String]] = {
+    val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
+    for ((topic, nConsumers) <- topicCountMap) {
+      val consumerSet = new mutable.HashSet[String]
+      assert(nConsumers >= 1)
+      for (i <- 0 until nConsumers)
+        consumerSet += consumerIdString + "-" + i
+      consumerThreadIdsPerTopicMap.put(topic, consumerSet)
+    }
+    consumerThreadIdsPerTopicMap
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
+      case _ => false
+    }
+  }
+
+  /**
+   *  return json of
+   *  { "topic1" : 4,
+   *    "topic2" : 4
+   *  }
+   */
+  def toJsonString() : String = {
+    val builder = new StringBuilder
+    builder.append("{ ")
+    var i = 0
+    for ( (topic, nConsumers) <- topicCountMap) {
+      if (i > 0)
+        builder.append(",")
+      builder.append("\"" + topic + "\": " + nConsumers)
+      i += 1
+    }
+    builder.append(" }")
+    builder.toString
+  }
+}



Mime
View raw message