Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiProducerRequest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.Request
+
+object MultiProducerRequest {
+ def readFrom(buffer: ByteBuffer): MultiProducerRequest = {
+ val count = buffer.getShort
+ val produces = new Array[ProducerRequest](count)
+ for(i <- 0 until produces.length)
+ produces(i) = ProducerRequest.readFrom(buffer)
+ new MultiProducerRequest(produces)
+ }
+}
+
+class MultiProducerRequest(val produces: Array[ProducerRequest]) extends Request(RequestKeys.MultiProduce) {
+ def writeTo(buffer: ByteBuffer) {
+ if(produces.length > Short.MaxValue)
+ throw new IllegalArgumentException("Number of requests in MultiProducer exceeds " + Short.MaxValue + ".")
+ buffer.putShort(produces.length.toShort)
+ for(produce <- produces)
+ produce.writeTo(buffer)
+ }
+
+ def sizeInBytes: Int = {
+ var size = 2
+ for(produce <- produces)
+ size += produce.sizeInBytes
+ size
+ }
+
+ override def toString(): String = {
+ val buffer = new StringBuffer
+ for(produce <- produces) {
+ buffer.append(produce.toString)
+ buffer.append(",")
+ }
+ buffer.toString
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.{nonthreadsafe, Utils}
+import kafka.network.{Send, Request}
+import java.nio.channels.WritableByteChannel
+import kafka.common.ErrorMapping
+
+object OffsetRequest {
+ val SmallestTimeString = "smallest"
+ val LargestTimeString = "largest"
+ val LatestTime = -1L
+ val EarliestTime = -2L
+
+ def readFrom(buffer: ByteBuffer): OffsetRequest = {
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt()
+ val offset = buffer.getLong
+ val maxNumOffsets = buffer.getInt
+ new OffsetRequest(topic, partition, offset, maxNumOffsets)
+ }
+
+ def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = {
+ val size = 4 + 8 * offsets.length
+ val buffer = ByteBuffer.allocate(size)
+ buffer.putInt(offsets.length)
+ for (i <- 0 until offsets.length)
+ buffer.putLong(offsets(i))
+ buffer.rewind
+ buffer
+ }
+
+ def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = {
+ val size = buffer.getInt
+ val offsets = new Array[Long](size)
+ for (i <- 0 until offsets.length)
+ offsets(i) = buffer.getLong
+ offsets
+ }
+}
+
+class OffsetRequest(val topic: String,
+ val partition: Int,
+ val time: Long,
+ val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
+
+ def writeTo(buffer: ByteBuffer) {
+ Utils.writeShortString(buffer, topic, "UTF-8")
+ buffer.putInt(partition)
+ buffer.putLong(time)
+ buffer.putInt(maxNumOffsets)
+ }
+
+ def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
+
+ override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
+ ", maxNumOffsets:" + maxNumOffsets + ")"
+}
+
+@nonthreadsafe
+private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
+ private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
+ private val header = ByteBuffer.allocate(6)
+ header.putInt(size.asInstanceOf[Int] + 2)
+ header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
+ header.rewind()
+ private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
+
+ var complete: Boolean = false
+
+ def writeTo(channel: WritableByteChannel): Int = {
+ expectIncomplete()
+ var written = 0
+ if(header.hasRemaining)
+ written += channel.write(header)
+ if(!header.hasRemaining && contentBuffer.hasRemaining)
+ written += channel.write(contentBuffer)
+
+ if(!contentBuffer.hasRemaining)
+ complete = true
+ written
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.message._
+import kafka.network._
+import kafka.utils._
+
+object ProducerRequest {
+ val RandomPartition = -1
+
+ def readFrom(buffer: ByteBuffer): ProducerRequest = {
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt
+ val messageSetSize = buffer.getInt
+ val messageSetBuffer = buffer.slice()
+ messageSetBuffer.limit(messageSetSize)
+ buffer.position(buffer.position + messageSetSize)
+ new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer))
+ }
+}
+
+class ProducerRequest(val topic: String,
+ val partition: Int,
+ val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+
+ def writeTo(buffer: ByteBuffer) {
+ Utils.writeShortString(buffer, topic, "UTF-8")
+ buffer.putInt(partition)
+ buffer.putInt(messages.serialized.limit)
+ buffer.put(messages.serialized)
+ messages.serialized.rewind
+ }
+
+ def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int]
+
+ def getTranslatedPartition(randomSelector: String => Int): Int = {
+ if (partition == ProducerRequest.RandomPartition)
+ return randomSelector(topic)
+ else
+ return partition
+ }
+
+ override def toString: String = {
+ val builder = new StringBuilder()
+ builder.append("ProducerRequest(")
+ builder.append(topic + ",")
+ builder.append(partition + ",")
+ builder.append(messages.sizeInBytes)
+ builder.append(")")
+ builder.toString
+ }
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: ProducerRequest =>
+ (that canEqual this) && topic == that.topic && partition == that.partition &&
+ messages.equals(that.messages)
+ case _ => false
+ }
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
+
+ override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/RequestKeys.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+object RequestKeys {
+ val Produce: Short = 0
+ val Fetch: Short = 1
+ val MultiFetch: Short = 2
+ val MultiProduce: Short = 3
+ val Offsets: Short = 4
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import java.util.Arrays
+import kafka.utils._
+import java.net.InetAddress
+import kafka.server.KafkaConfig
+import util.parsing.json.JSON
+
+/**
+ * A Kafka broker
+ */
+private[kafka] object Broker {
+ def createBroker(id: Int, brokerInfoString: String): Broker = {
+ val brokerInfo = brokerInfoString.split(":")
+ new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
+ }
+}
+
+private[kafka] class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+
+ override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
+
+ def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case n: Broker => id == n.id && host == n.host && port == n.port
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = Utils.hashcode(id, host, port)
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import kafka.utils._
+import scala.collection._
+
+/**
+ * The set of active brokers in the cluster
+ */
+private[kafka] class Cluster {
+
+ private val brokers = new mutable.HashMap[Int, Broker]
+
+ def this(brokerList: Iterable[Broker]) {
+ this()
+ for(broker <- brokerList)
+ brokers.put(broker.id, broker)
+ }
+
+ def getBroker(id: Int) = brokers.get(id).get
+
+ def add(broker: Broker) = brokers.put(broker.id, broker)
+
+ def remove(id: Int) = brokers.remove(id)
+
+ def size = brokers.size
+
+ override def toString(): String =
+ "Cluster(" + brokers.values.mkString(", ") + ")"
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Partition.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+object Partition {
+ def parse(s: String): Partition = {
+ val pieces = s.split("-")
+ if(pieces.length != 2)
+ throw new IllegalArgumentException("Expected name in the form x-y.")
+ new Partition(pieces(0).toInt, pieces(1).toInt)
+ }
+}
+
+class Partition(val brokerId: Int, val partId: Int) extends Ordered[Partition] {
+
+ def this(name: String) = {
+ this(1, 1)
+ }
+
+ def name = brokerId + "-" + partId
+
+ override def toString(): String = name
+
+ def compare(that: Partition) =
+ if (this.brokerId == that.brokerId)
+ this.partId - that.partId
+ else
+ this.brokerId - that.brokerId
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: Partition =>
+ (that canEqual this) && brokerId == that.brokerId && partId == that.partId
+ case _ => false
+ }
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[Partition]
+
+ override def hashCode: Int = 31 * (17 + brokerId) + partId
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import kafka.consumer._
+import kafka.message.InvalidMessageException
+import java.nio.ByteBuffer
+import java.lang.Throwable
+
+/**
+ * A bi-directional mapping between error codes and exceptions x
+ */
+object ErrorMapping {
+ val EmptyByteBuffer = ByteBuffer.allocate(0)
+
+ val UnknownCode = -1
+ val NoError = 0
+ val OffsetOutOfRangeCode = 1
+ val InvalidMessageCode = 2
+ val WrongPartitionCode = 3
+ val InvalidFetchSizeCode = 4
+
+ private val exceptionToCode =
+ Map[Class[Throwable], Int](
+ classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
+ classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
+ classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
+ classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode
+ ).withDefaultValue(UnknownCode)
+
+ /* invert the mapping */
+ private val codeToException =
+ (Map[Int, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
+
+ def codeFor(exception: Class[Throwable]): Int = exceptionToCode(exception)
+
+ def maybeThrowException(code: Int) =
+ if(code != 0)
+ throw codeToException(code).newInstance()
+}
+
+class InvalidTopicException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
+class MessageSizeTooLargeException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidConfigException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+/**
+ * Indicates that the given config parameter has invalid value
+ */
+class InvalidConfigException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidMessageSizeException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,9 @@
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class InvalidMessageSizeException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/InvalidPartitionException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates that the partition id is not between 0 and numPartitions-1
+ */
+class InvalidPartitionException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/NoBrokersForPartitionException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,9 @@
+package kafka.common
+
+/**
+ * Thrown when a request is made for broker but no brokers with that topic
+ * exist.
+ */
+class NoBrokersForPartitionException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class OffsetOutOfRangeException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnavailableProducerException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates a producer pool initialization problem
+*/
+class UnavailableProducerException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownCodecException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class UnknownCodecException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * If we don't know what else it is, call it this
+ */
+class UnknownException extends RuntimeException
Added: incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/UnknownMagicByteException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class UnknownMagicByteException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection.mutable._
+import scala.collection.JavaConversions._
+import org.I0Itec.zkclient._
+import joptsimple._
+import org.apache.log4j.Logger
+import java.util.Arrays.asList
+import java.util.Properties
+import java.util.Random
+import java.io.PrintStream
+import kafka.message._
+import kafka.utils.Utils
+import kafka.utils.ZkUtils
+import kafka.utils.StringSerializer
+
+/**
+ * Consumer that dumps messages out to standard out.
+ *
+ */
+object ConsoleConsumer {
+
+ private val logger = Logger.getLogger(getClass())
+
+ def main(args: Array[String]) {
+ val parser = new OptionParser
+ val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val groupIdOpt = parser.accepts("group", "The group id to consume on.")
+ .withRequiredArg
+ .describedAs("gid")
+ .defaultsTo("console-consumer-" + new Random().nextInt(100000))
+ .ofType(classOf[String])
+ val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1024 * 1024)
+ val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(2 * 1024 * 1024)
+ val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
+ .withRequiredArg
+ .describedAs("class")
+ .ofType(classOf[String])
+ .defaultsTo(classOf[NewlineMessageFormatter].getName)
+ val messageFormatterArgOpt = parser.accepts("property")
+ .withRequiredArg
+ .describedAs("prop")
+ .ofType(classOf[String])
+ val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
+ "start with the earliest message present in the log rather than the latest message.")
+ val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(10*1000)
+ val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
+ .withRequiredArg
+ .describedAs("num_messages")
+ .ofType(classOf[java.lang.Integer])
+ val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
+ "skip it instead of halt.")
+
+ val options: OptionSet = tryParse(parser, args)
+ checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+
+ val props = new Properties()
+ props.put("groupid", options.valueOf(groupIdOpt))
+ props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
+ props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+ props.put("auto.commit", "true")
+ props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
+ props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+ props.put("zk.connect", options.valueOf(zkConnectOpt))
+ val config = new ConsumerConfig(props)
+ val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
+
+ val topic = options.valueOf(topicIdOpt)
+ val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
+ val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+
+ val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
+
+ val connector = Consumer.create(config)
+
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ connector.shutdown()
+ // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
+ if(!options.has(groupIdOpt))
+ tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
+ }
+ })
+
+ var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
+ val iter =
+ if(maxMessages >= 0)
+ stream.slice(0, maxMessages)
+ else
+ stream
+
+ val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+ formatter.init(formatterArgs)
+
+ try {
+ for(message <- iter) {
+ try {
+ formatter.writeTo(message, System.out)
+ } catch {
+ case e =>
+ if (skipMessageOnError)
+ logger.error("error processing message, skipping and resume consumption: " + e)
+ else
+ throw e
+ }
+ }
+ } catch {
+ case e => logger.error("error processing message, stop consuming: " + e)
+ }
+
+ System.out.flush()
+ formatter.close()
+ connector.shutdown()
+ }
+
+ def tryParse(parser: OptionParser, args: Array[String]) = {
+ try {
+ parser.parse(args : _*)
+ } catch {
+ case e: OptionException => {
+ Utils.croak(e.getMessage)
+ null
+ }
+ }
+ }
+
+ def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+ for(arg <- required) {
+ if(!options.has(arg)) {
+ logger.error("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+ }
+
+ def tryParseFormatterArgs(args: Iterable[String]): Properties = {
+ val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+ if(!splits.forall(_.length == 2)) {
+ System.err.println("Invalid parser arguments: " + args.mkString(" "))
+ System.exit(1)
+ }
+ val props = new Properties
+ for(a <- splits)
+ props.put(a(0), a(1))
+ props
+ }
+
+ trait MessageFormatter {
+ def writeTo(message: Message, output: PrintStream)
+ def init(props: Properties) {}
+ def close() {}
+ }
+
+ class NewlineMessageFormatter extends MessageFormatter {
+ def writeTo(message: Message, output: PrintStream) {
+ val payload = message.payload
+ output.write(payload.array, payload.arrayOffset, payload.limit)
+ output.write('\n')
+ }
+ }
+
+ def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ => // swallow
+ }
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.Properties
+import kafka.utils.{ZKConfig, Utils}
+import kafka.api.OffsetRequest
+
+object ConsumerConfig {
+ val SocketTimeout = 30 * 1000
+ val SocketBufferSize = 64*1024
+ val FetchSize = 300 * 1024
+ val MaxFetchSize = 10*FetchSize
+ val BackoffIncrementMs = 1000
+ val AutoCommit = true
+ val AutoCommitInterval = 10 * 1000
+ val MaxQueuedChunks = 100
+ val AutoOffsetReset = OffsetRequest.SmallestTimeString
+ val ConsumerTimeoutMs = -1
+ val EmbeddedConsumerTopics = ""
+}
+
+class ConsumerConfig(props: Properties) extends ZKConfig(props) {
+ import ConsumerConfig._
+
+ /** a string that uniquely identifies a set of consumers within the same consumer group */
+ val groupId = Utils.getString(props, "groupid")
+
+ /** consumer id: generated automatically if not set.
+ * Set this explicitly for only testing purpose. */
+ val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */
+ if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None
+
+ /** the socket timeout for network requests */
+ val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
+
+ /** the socket receive buffer for network requests */
+ val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
+
+ /** the number of byes of messages to attempt to fetch */
+ val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
+
+ /** the maximum allowable fetch size for a very large message */
+ val maxFetchSize: Int = fetchSize * 10
+
+ /** to avoid repeatedly polling a broker node which has no new data
+ we will backoff every time we get an empty set from the broker*/
+ val backoffIncrementMs: Long = Utils.getInt(props, "backoff.increment.ms", BackoffIncrementMs)
+
+ /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
+ val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
+
+ /** the frequency in ms that the consumer offsets are committed to zookeeper */
+ val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
+
+ /** max number of messages buffered for consumption */
+ val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
+
+ /* what to do if an offset is out of range.
+ smallest : automatically reset the offset to the smallest offset
+ largest : automatically reset the offset to the largest offset
+ anything else: throw exception to the consumer */
+ val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
+
+ /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
+ val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
+
+ /* embed a consumer in the broker. e.g., topic1:1,topic2:1 */
+ val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics",
+ EmbeddedConsumerTopics))
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection._
+import kafka.utils.Utils
+import org.apache.log4j.Logger
+
+/**
+ * Main interface for consumer
+ */
+trait ConsumerConnector {
+ /**
+ * Create a list of MessageStreams for each topic.
+ *
+ * @param topicCountMap a map of (topic, #streams) pair
+ * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
+ * list is #streams. Each KafkaMessageStream supports an iterator of messages.
+ */
+ def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]]
+
+ /**
+ * Commit the offsets of all broker partitions connected by this connector.
+ */
+ def commitOffsets
+
+ /**
+ * Shut down the connector
+ */
+ def shutdown()
+}
+
+object Consumer {
+ private val logger = Logger.getLogger(getClass())
+ private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats"
+
+ /**
+ * Create a ConsumerConnector
+ *
+ * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
+ * connection string zk.connect.
+ */
+ def create(config: ConsumerConfig): ConsumerConnector = {
+ val consumerConnect = new ZookeeperConsumerConnector(config)
+ Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect, consumerStatsMBeanName))
+ consumerConnect
+ }
+
+ /**
+ * Create a ConsumerConnector
+ *
+ * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
+ * connection string zk.connect.
+ */
+ def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
+ val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
+ Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName))
+ consumerConnect
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.IteratorTemplate
+import org.apache.log4j.Logger
+import java.util.concurrent.{TimeUnit, BlockingQueue}
+import kafka.cluster.Partition
+import kafka.message.{MessageAndOffset, MessageSet, Message}
+
+/**
+ * An iterator that blocks until a value can be read from the supplied queue.
+ * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
+ *
+ */
+class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
+ extends IteratorTemplate[Message] {
+
+ private val logger = Logger.getLogger(classOf[ConsumerIterator])
+ private var current: Iterator[MessageAndOffset] = null
+ private var currentDataChunk: FetchedDataChunk = null
+ private var currentTopicInfo: PartitionTopicInfo = null
+ private var consumedOffset: Long = -1L
+
+ override def next(): Message = {
+ val message = super.next
+ if(consumedOffset < 0)
+ throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
+ currentTopicInfo.resetConsumeOffset(consumedOffset)
+ if(logger.isTraceEnabled)
+ logger.trace("Setting consumed offset to %d".format(consumedOffset))
+ message
+ }
+
+ protected def makeNext(): Message = {
+ // if we don't have an iterator, get one
+ if(current == null || !current.hasNext) {
+ if (consumerTimeoutMs < 0)
+ currentDataChunk = channel.take
+ else {
+ currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
+ if (currentDataChunk == null) {
+ throw new ConsumerTimeoutException
+ }
+ }
+ if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
+ if(logger.isDebugEnabled)
+ logger.debug("Received the shutdown command")
+ channel.offer(currentDataChunk)
+ return allDone
+ } else {
+ currentTopicInfo = currentDataChunk.topicInfo
+ if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+ logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+ .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
+ currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
+ }
+ current = currentDataChunk.messages.iterator
+ }
+ }
+ val item = current.next
+ consumedOffset = item.offset
+ item.message
+ }
+
+}
+
+class ConsumerTimeoutException() extends RuntimeException()
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.message.ByteBufferMessageSet
+
+private[consumer] class FetchedDataChunk(val messages: ByteBufferMessageSet,
+ val topicInfo: PartitionTopicInfo,
+ val fetchOffset: Long)
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection._
+import org.apache.log4j.Logger
+import kafka.cluster._
+import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.BlockingQueue
+
+/**
+ * The fetcher is a background thread that fetches data from a set of servers
+ */
+private[consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) {
+ private val logger = Logger.getLogger(getClass())
+ private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0)
+ @volatile
+ private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS
+
+ /**
+ * shutdown all fetch threads
+ */
+ def shutdown() {
+ // shutdown the old fetcher threads, if any
+ for (fetcherThread <- fetcherThreads)
+ fetcherThread.shutdown
+ fetcherThreads = EMPTY_FETCHER_THREADS
+ }
+
+ /**
+ * Open connections.
+ */
+ def initConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+ queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
+ shutdown
+
+ if (topicInfos == null)
+ return
+
+ queuesTobeCleared.foreach(_.clear)
+
+ // re-arrange by broker id
+ val m = new mutable.HashMap[Int, List[PartitionTopicInfo]]
+ for(info <- topicInfos) {
+ m.get(info.brokerId) match {
+ case None => m.put(info.brokerId, List(info))
+ case Some(lst) => m.put(info.brokerId, info :: lst)
+ }
+ }
+
+ // open a new fetcher thread for each broker
+ val ids = Set() ++ topicInfos.map(_.brokerId)
+ val brokers = ids.map(cluster.getBroker(_))
+ fetcherThreads = new Array[FetcherRunnable](brokers.size)
+ var i = 0
+ for(broker <- brokers) {
+ val fetcherThread = new FetcherRunnable("FetchRunnable-" + i, zkClient, config, broker, m.get(broker.id).get)
+ fetcherThreads(i) = fetcherThread
+ fetcherThread.start
+ i +=1
+ }
+ }
+}
+
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent.CountDownLatch
+import org.apache.log4j.Logger
+import java.nio.channels.{ClosedChannelException, ClosedByInterruptException}
+import kafka.common.{OffsetOutOfRangeException, ErrorMapping}
+import kafka.cluster.{Partition, Broker}
+import kafka.api.{MultiFetchResponse, OffsetRequest, FetchRequest}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import java.io.IOException
+
+class FetcherRunnable(val name: String,
+ val zkClient : ZkClient,
+ val config: ConsumerConfig,
+ val broker: Broker,
+ val partitionTopicInfos: List[PartitionTopicInfo])
+ extends Thread(name) {
+ private val logger = Logger.getLogger(getClass())
+ private val shutdownLatch = new CountDownLatch(1)
+ private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs,
+ config.socketBufferSize)
+ @volatile
+ private var stopped = false
+
+ def shutdown(): Unit = {
+ stopped = true
+ interrupt
+ logger.debug("awaiting shutdown on fetcher " + name)
+ shutdownLatch.await
+ logger.debug("shutdown of fetcher " + name + " thread complete")
+ }
+
+ override def run() {
+ for (info <- partitionTopicInfos)
+ logger.info(name + " start fetching topic: " + info.topic + " part: " + info.partition.partId + " offset: "
+ + info.getFetchOffset + " from " + broker.host + ":" + broker.port)
+
+ try {
+ while (!stopped) {
+ val fetches = partitionTopicInfos.map(info =>
+ new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
+
+ if (logger.isTraceEnabled)
+ logger.trace("fetch request: " + fetches.toString)
+
+ val response = simpleConsumer.multifetch(fetches : _*)
+
+ var read = 0L
+
+ for((messages, info) <- response.zip(partitionTopicInfos)) {
+ try {
+ var done = false
+ if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
+ logger.info("offset " + info.getFetchOffset + " out of range")
+ // see if we can fix this error
+ val resetOffset = resetConsumerOffsets(info.topic, info.partition)
+ if(resetOffset >= 0) {
+ info.resetFetchOffset(resetOffset)
+ info.resetConsumeOffset(resetOffset)
+ done = true
+ }
+ }
+ if (!done)
+ read += info.enqueue(messages, info.getFetchOffset)
+ }
+ catch {
+ case e1: IOException =>
+ // something is wrong with the socket, re-throw the exception to stop the fetcher
+ throw e1
+ case e2 =>
+ if (!stopped) {
+ // this is likely a repeatable error, log it and trigger an exception in the consumer
+ logger.error("error in FetcherRunnable for " + info, e2)
+ info.enqueueError(e2, info.getFetchOffset)
+ }
+ // re-throw the exception to stop the fetcher
+ throw e2
+ }
+ }
+
+ if (logger.isTraceEnabled)
+ logger.trace("fetched bytes: " + read)
+ if(read == 0) {
+ logger.debug("backing off " + config.backoffIncrementMs + " ms")
+ Thread.sleep(config.backoffIncrementMs)
+ }
+ }
+ }
+ catch {
+ case e =>
+ if (stopped)
+ logger.info("FecherRunnable " + this + " interrupted")
+ else
+ logger.error("error in FetcherRunnable ", e)
+ }
+
+ logger.info("stopping fetcher " + name + " to host " + broker.host)
+ Utils.swallow(logger.info, simpleConsumer.close)
+ shutdownComplete()
+ }
+
+ /**
+ * Record that the thread shutdown is complete
+ */
+ private def shutdownComplete() = shutdownLatch.countDown
+
+ private def resetConsumerOffsets(topic : String,
+ partition: Partition) : Long = {
+ var offset : Long = 0
+ config.autoOffsetReset match {
+ case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime
+ case OffsetRequest.LargestTimeString => offset = OffsetRequest.LatestTime
+ case _ => return -1
+ }
+
+ // get mentioned offset from the broker
+ val offsets = simpleConsumer.getOffsetsBefore(topic, partition.partId, offset, 1)
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+
+ // reset manually in zookeeper
+ logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString)
+
+ offsets(0)
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent.BlockingQueue
+import org.apache.log4j.Logger
+import kafka.message.Message
+
+
+/**
+ * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread
+ * that feeds messages into a blocking queue for processing.
+ */
+class KafkaMessageStream(private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
+ extends Iterable[Message] with java.lang.Iterable[Message]{
+
+ private val logger = Logger.getLogger(getClass())
+ private val iter: ConsumerIterator = new ConsumerIterator(queue, consumerTimeoutMs)
+
+ /**
+ * Create an iterator over messages in the stream.
+ */
+ def iterator(): ConsumerIterator = iter
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.nio.channels._
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import kafka.message._
+import kafka.cluster._
+import kafka.common.ErrorMapping
+import org.apache.log4j.Logger
+
+private[consumer] class PartitionTopicInfo(val topic: String,
+ val brokerId: Int,
+ val partition: Partition,
+ private val chunkQueue: BlockingQueue[FetchedDataChunk],
+ private val consumedOffset: AtomicLong,
+ private val fetchedOffset: AtomicLong,
+ private val fetchSize: AtomicInteger) {
+ private val logger = Logger.getLogger(getClass())
+ if (logger.isDebugEnabled) {
+ logger.debug("initial consumer offset of " + this + " is " + consumedOffset.get)
+ logger.debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
+ }
+
+ def getConsumeOffset() = consumedOffset.get
+
+ def getFetchOffset() = fetchedOffset.get
+
+ def resetConsumeOffset(newConsumeOffset: Long) = {
+ consumedOffset.set(newConsumeOffset)
+ if (logger.isDebugEnabled)
+ logger.debug("reset consume offset of " + this + " to " + newConsumeOffset)
+ }
+
+ def resetFetchOffset(newFetchOffset: Long) = {
+ fetchedOffset.set(newFetchOffset)
+ if (logger.isDebugEnabled)
+ logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
+ }
+
+ /**
+ * Enqueue a message set for processing
+ * @return the number of valid bytes
+ */
+ def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
+ val size = messages.shallowValidBytes
+ if(size > 0) {
+ // update fetched offset to the compressed data chunk size, not the decompressed message set size
+ if(logger.isTraceEnabled)
+ logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+ val newOffset = fetchedOffset.addAndGet(size)
+ if (logger.isDebugEnabled)
+ logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+ chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
+ }
+ size
+ }
+
+ /**
+ * add an empty message with the exception to the queue so that client can see the error
+ */
+ def enqueueError(e: Throwable, fetchOffset: Long) = {
+ val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
+ }
+
+ override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
+ ": consumed offset = " + consumedOffset.get
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.net._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import org.apache.log4j.Logger
+import kafka.api._
+import kafka.common._
+import kafka.message._
+import kafka.network._
+import kafka.utils._
+
+/**
+ * A consumer of kafka messages
+ */
+@threadsafe
+class SimpleConsumer(val host: String,
+ val port: Int,
+ val soTimeout: Int,
+ val bufferSize: Int) {
+ private val logger = Logger.getLogger(getClass())
+ private var channel : SocketChannel = null
+ private val lock = new Object()
+
+ private def connect(): SocketChannel = {
+ close
+ val address = new InetSocketAddress(host, port)
+
+ val channel = SocketChannel.open
+ if(logger.isDebugEnabled)
+ logger.debug("Connected to " + address + " for fetching.")
+ channel.configureBlocking(true)
+ channel.socket.setReceiveBufferSize(bufferSize)
+ channel.socket.setSoTimeout(soTimeout)
+ channel.socket.setKeepAlive(true)
+ channel.connect(address)
+ if(logger.isTraceEnabled) {
+ logger.trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize)
+ logger.trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout)
+ }
+ channel
+ }
+
+ private def close(channel: SocketChannel) = {
+ if(logger.isDebugEnabled)
+ logger.debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
+ Utils.swallow(logger.warn, channel.close())
+ Utils.swallow(logger.warn, channel.socket.close())
+ }
+
+ def close() {
+ lock synchronized {
+ if (channel != null)
+ close(channel)
+ channel = null
+ }
+ }
+
+ /**
+ * Fetch a set of messages from a topic.
+ *
+ * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+ * @return a set of fetched messages
+ */
+ def fetch(request: FetchRequest): ByteBufferMessageSet = {
+ lock synchronized {
+ val startTime = SystemTime.nanoseconds
+ getOrMakeConnection()
+ var response: Tuple2[Receive,Int] = null
+ try {
+ sendRequest(request)
+ response = getResponse
+ } catch {
+ case e : java.io.IOException =>
+ logger.info("fetch reconnect due to " + e)
+ // retry once
+ try {
+ channel = connect
+ sendRequest(request)
+ response = getResponse
+ }catch {
+ case ioe: java.io.IOException => channel = null; throw ioe;
+ }
+ case e => throw e
+ }
+ val endTime = SystemTime.nanoseconds
+ SimpleConsumerStats.recordFetchRequest(endTime - startTime)
+ SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
+ new ByteBufferMessageSet(response._1.buffer, request.offset, response._2)
+ }
+ }
+
+ /**
+ * Combine multiple fetch requests in one call.
+ *
+ * @param fetches a sequence of fetch requests.
+ * @return a sequence of fetch responses
+ */
+ def multifetch(fetches: FetchRequest*): MultiFetchResponse = {
+ lock synchronized {
+ val startTime = SystemTime.nanoseconds
+ getOrMakeConnection()
+ var response: Tuple2[Receive,Int] = null
+ try {
+ sendRequest(new MultiFetchRequest(fetches.toArray))
+ response = getResponse
+ } catch {
+ case e : java.io.IOException =>
+ logger.info("multifetch reconnect due to " + e)
+ // retry once
+ try {
+ channel = connect
+ sendRequest(new MultiFetchRequest(fetches.toArray))
+ response = getResponse
+ }catch {
+ case ioe: java.io.IOException => channel = null; throw ioe;
+ }
+ case e => throw e
+ }
+ val endTime = SystemTime.nanoseconds
+ SimpleConsumerStats.recordFetchRequest(endTime - startTime)
+ SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
+
+ // error code will be set on individual messageset inside MultiFetchResponse
+ new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset))
+ }
+ }
+
+ /**
+ * Get a list of valid offsets (up to maxSize) before the given time.
+ * The result is a list of offsets, in descending order.
+ *
+ * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
+ * @return an array of offsets
+ */
+ def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
+ lock synchronized {
+ getOrMakeConnection()
+ var response: Tuple2[Receive,Int] = null
+ try {
+ sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
+ response = getResponse
+ } catch {
+ case e : java.io.IOException =>
+ logger.info("getOffsetsBefore reconnect due to " + e)
+ // retry once
+ try {
+ channel = connect
+ sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
+ response = getResponse
+ }catch {
+ case ioe: java.io.IOException => channel = null; throw ioe;
+ }
+ }
+ OffsetRequest.deserializeOffsetArray(response._1.buffer)
+ }
+ }
+
+ private def sendRequest(request: Request) = {
+ val send = new BoundedByteBufferSend(request)
+ send.writeCompletely(channel)
+ }
+
+ private def getResponse(): Tuple2[Receive,Int] = {
+ val response = new BoundedByteBufferReceive()
+ response.readCompletely(channel)
+
+ // this has the side effect of setting the initial position of buffer correctly
+ val errorCode: Int = response.buffer.getShort
+ (response, errorCode)
+ }
+
+ private def getOrMakeConnection() {
+ if(channel == null) {
+ channel = connect()
+ }
+ }
+}
+
+trait SimpleConsumerStatsMBean {
+ def getFetchRequestsPerSecond: Double
+ def getAvgFetchRequestMs: Double
+ def getMaxFetchRequestMs: Double
+ def getNumFetchRequests: Long
+ def getConsumerThroughput: Double
+}
+
+@threadsafe
+class SimpleConsumerStats extends SimpleConsumerStatsMBean {
+ private val fetchRequestStats = new SnapshotStats
+
+ def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
+
+ def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data)
+
+ def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond
+
+ def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0)
+
+ def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0)
+
+ def getNumFetchRequests: Long = fetchRequestStats.getNumRequests
+
+ def getConsumerThroughput: Double = fetchRequestStats.getThroughput
+}
+
+object SimpleConsumerStats {
+ private val logger = Logger.getLogger(getClass())
+ private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
+ private val stats = new SimpleConsumerStats
+ Utils.swallow(logger.warn, Utils.registerMBean(stats, simpleConsumerstatsMBeanName))
+
+ def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
+ def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data)
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import scala.collection._
+import scala.util.parsing.json.JSON
+import org.apache.log4j.Logger
+
+private[consumer] object TopicCount {
+ private val logger = Logger.getLogger(getClass())
+ val myConversionFunc = {input : String => input.toInt}
+ JSON.globalNumberParser = myConversionFunc
+
+ def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = {
+ var topMap : Map[String,Int] = null
+ try {
+ JSON.parseFull(jsonString) match {
+ case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
+ case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
+ }
+ }
+ catch {
+ case e =>
+ logger.error("error parsing consumer json string " + jsonString, e)
+ throw e
+ }
+
+ new TopicCount(consumerIdSting, topMap)
+ }
+
+}
+
+private[consumer] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
+
+ def getConsumerThreadIdsPerTopic()
+ : Map[String, Set[String]] = {
+ val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
+ for ((topic, nConsumers) <- topicCountMap) {
+ val consumerSet = new mutable.HashSet[String]
+ assert(nConsumers >= 1)
+ for (i <- 0 until nConsumers)
+ consumerSet += consumerIdString + "-" + i
+ consumerThreadIdsPerTopicMap.put(topic, consumerSet)
+ }
+ consumerThreadIdsPerTopicMap
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
+ case _ => false
+ }
+ }
+
+ /**
+ * return json of
+ * { "topic1" : 4,
+ * "topic2" : 4
+ * }
+ */
+ def toJsonString() : String = {
+ val builder = new StringBuilder
+ builder.append("{ ")
+ var i = 0
+ for ( (topic, nConsumers) <- topicCountMap) {
+ if (i > 0)
+ builder.append(",")
+ builder.append("\"" + topic + "\": " + nConsumers)
+ i += 1
+ }
+ builder.append(" }")
+ builder.toString
+ }
+}
|