kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [19/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+import java.util.concurrent.LinkedBlockingQueue
+import kafka.utils.Utils
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.log4j.{Level, Logger}
+import kafka.api.ProducerRequest
+import kafka.serializer.Encoder
+import java.lang.management.ManagementFactory
+import javax.management.ObjectName
+import java.util.{Random, Properties}
+import kafka.producer.{ProducerConfig, SyncProducer}
+
+object AsyncProducer {
+  val Shutdown = new Object
+  val Random = new Random
+  val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
+}
+
+private[kafka] class AsyncProducer[T](config: AsyncProducerConfig,
+                                      producer: SyncProducer,
+                                      serializer: Encoder[T],
+                                      eventHandler: EventHandler[T] = null,
+                                      eventHandlerProps: Properties = null,
+                                      cbkHandler: CallbackHandler[T] = null,
+                                      cbkHandlerProps: Properties = null) {
+  private val logger = Logger.getLogger(classOf[AsyncProducer[T]])
+  private val closed = new AtomicBoolean(false)
+  private val queue = new LinkedBlockingQueue[QueueItem[T]](config.queueSize)
+  // initialize the callback handlers
+  if(eventHandler != null)
+    eventHandler.init(eventHandlerProps)
+  if(cbkHandler != null)
+    cbkHandler.init(cbkHandlerProps)
+  private val sendThread = new ProducerSendThread("ProducerSendThread-" + AsyncProducer.Random.nextInt, queue,
+    serializer, producer,
+    if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props), cbkHandler),
+    cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown)
+  sendThread.setDaemon(false)
+
+  val asyncProducerStats = new AsyncProducerStats[T](queue)
+  val mbs = ManagementFactory.getPlatformMBeanServer
+  try {
+    val objName = new ObjectName(AsyncProducer.ProducerMBeanName)
+    if(mbs.isRegistered(objName))
+      mbs.unregisterMBean(objName)
+    mbs.registerMBean(asyncProducerStats, objName)
+  }catch {
+    case e: Exception => logger.warn("can't register AsyncProducerStats")
+  }
+
+  def this(config: AsyncProducerConfig) {
+    this(config,
+      new SyncProducer(config),
+      Utils.getObject(config.serializerClass),
+      Utils.getObject(config.eventHandler),
+      config.eventHandlerProps,
+      Utils.getObject(config.cbkHandler),
+      config.cbkHandlerProps)
+  }
+
+  def start = sendThread.start
+
+  def send(topic: String, event: T) { send(topic, event, ProducerRequest.RandomPartition) }
+
+  def send(topic: String, event: T, partition:Int) {
+    asyncProducerStats.recordEvent
+
+    if(closed.get)
+      throw new QueueClosedException("Attempt to add event to a closed queue.")
+
+    var data = new QueueItem(event, topic, partition)
+    if(cbkHandler != null)
+      data = cbkHandler.beforeEnqueue(data)
+
+    val added = queue.offer(data)
+    if(cbkHandler != null)
+      cbkHandler.afterEnqueue(data, added)
+
+    if(!added) {
+      asyncProducerStats.recordDroppedEvents
+      logger.error("Event queue is full of unsent messages, could not send event: " + event.toString)
+      throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString)
+    }else {
+      if(logger.isTraceEnabled) {
+        logger.trace("Added event to send queue for topic: " + topic + ":" + event.toString)
+        logger.trace("Remaining queue size: " + queue.remainingCapacity)
+      }
+    }
+  }
+
+  def close = {
+    if(cbkHandler != null) {
+      cbkHandler.close
+      logger.info("Closed the callback handler")
+    }
+    queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1))
+    sendThread.shutdown
+    sendThread.awaitShutdown
+    producer.close
+    closed.set(true)
+    logger.info("Closed AsyncProducer")
+  }
+
+  // for testing only
+  def setLoggerLevel(level: Level) = logger.setLevel(level)
+}
+
+class QueueItem[T](data: T, topic: String, partition: Int) {
+  def getData: T = data
+  def getPartition: Int = partition
+  def getTopic:String = topic
+  override def toString = "topic: " + topic + ", partition: " + partition + ", data: " + data.toString
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer.async
+
+import java.util.Properties
+import kafka.utils.Utils
+import kafka.producer.SyncProducerConfig
+
+class AsyncProducerConfig(override val props: Properties) extends SyncProducerConfig(props)
+        with AsyncProducerConfigShared {
+}
+
+trait AsyncProducerConfigShared {
+  val props: Properties
+
+  /* maximum time, in milliseconds, for buffering data on the producer queue */
+  val queueTime = Utils.getInt(props, "queue.time", 5000)
+
+  /** the maximum size of the blocking queue for buffering on the producer */
+  val queueSize = Utils.getInt(props, "queue.size", 10000)
+
+  /** the number of messages batched at the producer */
+  val batchSize = Utils.getInt(props, "batch.size", 200)
+
+  /** the serializer class for events */
+  val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder")
+
+  /** the callback handler for one or multiple events */
+  val cbkHandler = Utils.getString(props, "callback.handler", null)
+
+  /** properties required to initialize the callback handler */
+  val cbkHandlerProps = Utils.getProps(props, "callback.handler.props", null)
+
+  /** the handler for events */
+  val eventHandler = Utils.getString(props, "event.handler", null)
+
+  /** properties required to initialize the callback handler */
+  val eventHandlerProps = Utils.getProps(props, "event.handler.props", null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer.async
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, ScheduledThreadPoolExecutor, ScheduledExecutorService, BlockingQueue}
+
+class AsyncProducerStats[T](queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerStatsMBean {
+  val droppedEvents = new AtomicInteger(0)
+  val numEvents = new AtomicInteger(0)
+
+  def getAsyncProducerQueueSize: Int = queue.size
+
+  def getAsyncProducerDroppedEvents: Int = droppedEvents.get
+
+  def recordDroppedEvents = droppedEvents.getAndAdd(1)
+
+  def recordEvent = numEvents.getAndAdd(1)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer.async
+
+trait AsyncProducerStatsMBean {
+  def getAsyncProducerQueueSize: Int
+  def getAsyncProducerDroppedEvents: Int
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/CallbackHandler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/CallbackHandler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/CallbackHandler.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer.async
+
+import java.util.Properties
+
+/**
+ * Callback handler APIs for use in the async producer. The purpose is to
+ * give the user some callback handles to insert custom functionality at
+ * various stages as the data flows through the pipeline of the async producer
+ */
+trait CallbackHandler[T] {
+  /**
+   * Initializes the callback handler using a Properties object
+   * @param props properties used to initialize the callback handler
+   */
+  def init(props: Properties)
+
+  /**
+   * Callback to process the data before it enters the batching queue
+   * of the asynchronous producer
+   * @param data the data sent to the producer
+   * @return the processed data that enters the queue
+   */
+  def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T]
+
+  /**
+   * Callback to process the data right after it enters the batching queue
+   * of the asynchronous producer
+   * @param data the data sent to the producer
+   * @param added flag that indicates if the data was successfully added to the queue
+   */
+  def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean)
+
+  /**
+   * Callback to process the data item right after it has been dequeued by the
+   * background sender thread of the asynchronous producer
+   * @param data the data item dequeued from the async producer queue
+   * @return the processed list of data items that gets added to the data handled by the event handler
+   */
+  def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]]
+
+  /**
+   * Callback to process the batched data right before it is being sent by the
+   * handle API of the event handler
+   * @param data the batched data received by the event handler
+   * @return the processed batched data that gets sent by the handle() API of the event handler
+   */
+  def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]]
+
+  /**
+   * Callback to process the last batch of data right before the producer send thread is shutdown
+   * @return the last batch of data that is sent to the EventHandler
+  */
+  def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]]
+
+  /**
+   * Cleans up and shuts down the callback handler
+   */
+  def close
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer.async
+
+import collection.mutable.HashMap
+import collection.mutable.Map
+import org.apache.log4j.Logger
+import kafka.api.ProducerRequest
+import kafka.serializer.Encoder
+import java.util.Properties
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer}
+
+private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
+                                            val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {
+
+  private val logger = Logger.getLogger(classOf[DefaultEventHandler[T]])
+
+  override def init(props: Properties) { }
+
+  override def handle(events: Seq[QueueItem[T]], syncProducer: SyncProducer, serializer: Encoder[T]) {
+    var processedEvents = events
+    if(cbkHandler != null)
+      processedEvents = cbkHandler.beforeSendingData(events)
+    send(serialize(collate(processedEvents), serializer), syncProducer)
+  }
+
+  private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
+    if(messagesPerTopic.size > 0) {
+      val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
+      syncProducer.multiSend(requests)
+      if(logger.isTraceEnabled)
+        logger.trace("kafka producer sent messages for topics " + messagesPerTopic)
+    }
+  }
+
+  private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
+                        serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
+    import scala.collection.JavaConversions._
+    val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
+    val topicsAndPartitions = eventsPerTopic.map(e => e._1)
+    /** enforce the compressed.topics config here.
+     *  If the compression codec is anything other than NoCompressionCodec,
+     *    Enable compression only for specified topics if any
+     *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
+     *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+     */
+    val messages = eventsPerTopicMap.map(e => {
+      config.compressionCodec match {
+        case NoCompressionCodec =>
+          if(logger.isDebugEnabled)
+            logger.debug("Sending %d messages with no compression".format(e._2.size))
+          new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
+        case _ =>
+          config.compressedTopics.size match {
+            case 0 =>
+              if(logger.isDebugEnabled)
+                logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
+              new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
+            case _ =>
+              if(config.compressedTopics.contains(e._1._1)) {
+                if(logger.isDebugEnabled)
+                  logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec))
+                new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
+              }
+              else {
+                if(logger.isDebugEnabled)
+                  logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
+                    .format(e._2.size, e._1._1, config.compressedTopics.toString))
+                new ByteBufferMessageSet(NoCompressionCodec, e._2: _*)
+              }
+          }
+      }
+    })
+    topicsAndPartitions.zip(messages)
+  }
+
+  private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
+    val collatedEvents = new HashMap[(String, Int), Seq[T]]
+    val distinctTopics = events.map(e => e.getTopic).toSeq.distinct
+    val distinctPartitions = events.map(e => e.getPartition).distinct
+
+    var remainingEvents = events
+    distinctTopics foreach { topic =>
+      val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic))
+      remainingEvents = topicEvents._2
+      distinctPartitions.foreach { p =>
+        val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p))
+        collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq)
+      }
+    }
+    collatedEvents
+  }
+
+  override def close = {
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/EventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/EventHandler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/EventHandler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/EventHandler.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer.async
+
+import java.util.Properties
+import kafka.producer.SyncProducer
+import kafka.serializer.Encoder
+
+/**
+ * Handler that dispatches the batched data from the queue of the
+ * asynchronous producer.
+ */
+trait EventHandler[T] {
+  /**
+   * Initializes the event handler using a Properties object
+   * @param props the properties used to initialize the event handler
+  */
+  def init(props: Properties) {}
+
+  /**
+   * Callback to dispatch the batched data and send it to a Kafka server
+   * @param events the data sent to the producer
+   * @param producer the low-level producer used to send the data
+  */
+  def handle(events: Seq[QueueItem[T]], producer: SyncProducer, encoder: Encoder[T])
+
+  /**
+   * Cleans up and shuts down the event handler
+  */
+  def close {}
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/MissingConfigException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/MissingConfigException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/MissingConfigException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+/* Indicates any missing configuration parameter */
+class MissingConfigException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+import kafka.utils.SystemTime
+import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
+import org.apache.log4j.Logger
+import collection.mutable.ListBuffer
+import kafka.serializer.Encoder
+import kafka.producer.SyncProducer
+
+private[async] class ProducerSendThread[T](val threadName: String,
+                                           val queue: BlockingQueue[QueueItem[T]],
+                                           val serializer: Encoder[T],
+                                           val underlyingProducer: SyncProducer,
+                                           val handler: EventHandler[T],
+                                           val cbkHandler: CallbackHandler[T],
+                                           val queueTime: Long,
+                                           val batchSize: Int,
+                                           val shutdownCommand: Any) extends Thread(threadName) {
+
+  private val logger = Logger.getLogger(classOf[ProducerSendThread[T]])
+  private val shutdownLatch = new CountDownLatch(1)
+
+  override def run {
+
+    try {
+      val remainingEvents = processEvents
+      if(logger.isDebugEnabled) logger.debug("Remaining events = " + remainingEvents.size)
+
+      // handle remaining events
+      if(remainingEvents.size > 0) {
+        if(logger.isDebugEnabled)
+           logger.debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size))
+        tryToHandle(remainingEvents)
+      }
+    }catch {
+      case e: Exception => logger.error("Error in sending events: ", e)
+    }finally {
+      shutdownLatch.countDown
+    }
+  }
+
+  def awaitShutdown = shutdownLatch.await
+
+  def shutdown = {
+    handler.close
+    logger.info("Shutdown thread complete")
+  }
+
+  private def processEvents(): Seq[QueueItem[T]] = {
+    var lastSend = SystemTime.milliseconds
+    var events = new ListBuffer[QueueItem[T]]
+    var full: Boolean = false
+
+    // drain the queue until you get a shutdown command
+    Stream.continually(queue.poll(scala.math.max(0, queueTime - (lastSend - SystemTime.milliseconds)), TimeUnit.MILLISECONDS))
+                      .takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach {
+      currentQueueItem =>
+        val elapsed = (SystemTime.milliseconds - lastSend)
+        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
+        // returns a null object
+        val expired = currentQueueItem == null
+        if(currentQueueItem != null) {
+          // handle the dequeued current item
+          if(cbkHandler != null)
+            events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
+          else
+            events += currentQueueItem
+
+          // check if the batch size is reached
+          full = events.size >= batchSize
+        }
+        if(full || expired) {
+          if(logger.isDebugEnabled) {
+            if(expired) logger.debug(elapsed + " ms elapsed. Queue time reached. Sending..")
+            if(full) logger.debug("Batch full. Sending..")
+          }
+          // if either queue time has reached or batch size has reached, dispatch to event handler
+          tryToHandle(events)
+          lastSend = SystemTime.milliseconds
+          events = new ListBuffer[QueueItem[T]]
+        }
+    }
+    if(cbkHandler != null) {
+      logger.info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
+      val addedEvents = cbkHandler.lastBatchBeforeClose
+      logEvents("last batch before close", addedEvents)
+      events = events ++ addedEvents
+    }
+    events
+  }
+
+  def tryToHandle(events: Seq[QueueItem[T]]) {
+    try {
+      if(logger.isDebugEnabled) logger.debug("Handling " + events.size + " events")
+      handler.handle(events, underlyingProducer, serializer)
+    }catch {
+      case e: Exception => logger.error("Error in handling batch of " + events.size + " events", e)
+    }
+  }
+
+  private def logEvents(tag: String, events: Iterable[QueueItem[T]]) {
+    if(logger.isTraceEnabled) {
+      logger.trace("events for " + tag + ":")
+      for (event <- events)
+        logger.trace(event.getData.toString)
+    }
+  }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueClosedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueClosedException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueClosedException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueClosedException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+/* Indicates that client is sending event to a closed queue */
+class QueueClosedException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueFullException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueFullException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueFullException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/QueueFullException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+/* Indicates the queue for sending messages is full of unsent messages */
+class QueueFullException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Decoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Decoder.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Decoder.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Decoder.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.serializer
+
+import kafka.message.Message
+
+trait Decoder[T] {
+  def toEvent(message: Message):T
+}
+
+class DefaultDecoder extends Decoder[Message] {
+  def toEvent(message: Message):Message = message
+}
+
+class StringDecoder extends Decoder[String] {
+  def toEvent(message: Message):String = {
+    val buf = message.payload
+    val arr = new Array[Byte](buf.remaining)
+    buf.get(arr)
+    new String(arr)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Encoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Encoder.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Encoder.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/serializer/Encoder.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.serializer
+
+import kafka.message.Message
+
+trait Encoder[T] {
+  def toMessage(event: T):Message
+}
+
+class DefaultEncoder extends Encoder[Message] {
+  override def toMessage(event: Message):Message = event
+}
+
+class StringEncoder extends Encoder[String] {
+  override def toMessage(event: String):Message = new Message(event.getBytes)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+import kafka.utils.{Utils, ZKConfig}
+import kafka.message.Message
+
+/**
+ * Configuration settings for the kafka server
+ */
+class KafkaConfig(props: Properties) extends ZKConfig(props) {
+  /* the port to listen and accept connections on */
+  val port: Int = Utils.getInt(props, "port", 6667)
+
+  /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
+  val hostName: String = Utils.getString(props, "hostname", null)
+
+  /* the broker id for this server */
+  val brokerId: Int = Utils.getInt(props, "brokerid")
+  
+  /* the SO_SNDBUFF buffer of the socket sever sockets */
+  val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
+  
+  /* the SO_RCVBUFF buffer of the socket sever sockets */
+  val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
+  
+  /* the maximum number of bytes in a socket request */
+  val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+  
+  /* the number of worker threads that the server uses for handling all client requests*/
+  val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
+  
+  /* the interval in which to measure performance statistics */
+  val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
+  
+  /* the default number of log partitions per topic */
+  val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
+  
+  /* the directory in which the log data is kept */
+  val logDir = Utils.getString(props, "log.dir")
+  
+  /* the maximum size of a single log file */
+  val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+  
+  /* the number of messages accumulated on a log partition before messages are flushed to disk */
+  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+  
+  /* the number of hours to keep a log file before deleting it */
+  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+  
+  /* the number of hours to keep a log file before deleting it for some specific topic*/
+  val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+
+  /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
+  val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  
+  /* enable zookeeper registration in the server */
+  val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
+
+  /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
+  val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
+
+  /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
+  val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms",  3000)
+
+  /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
+  val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
+
+   /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
+  val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.channels._
+import org.apache.log4j.Logger
+import kafka.producer._
+import kafka.consumer._
+import kafka.log._
+import kafka.network._
+import kafka.message._
+import kafka.server._
+import kafka.api._
+import kafka.common.ErrorMapping
+import kafka.utils.{Utils, SystemTime}
+import java.io.IOException
+
+/**
+ * Logic to handle the various Kafka requests
+ */
+private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {
+  
+  private val logger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  private val requestLogger = Logger.getLogger("kafka.request.logger")
+
+  def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = {
+    requestTypeId match {
+      case RequestKeys.Produce => handleProducerRequest _
+      case RequestKeys.Fetch => handleFetchRequest _
+      case RequestKeys.MultiFetch => handleMultiFetchRequest _
+      case RequestKeys.MultiProduce => handleMultiProducerRequest _
+      case RequestKeys.Offsets => handleOffsetRequest _
+      case _ => throw new IllegalStateException("No mapping found for handler id " + requestTypeId)
+    }
+  }
+  
+  def handleProducerRequest(receive: Receive): Option[Send] = {
+    val sTime = SystemTime.milliseconds
+    val request = ProducerRequest.readFrom(receive.buffer)
+
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Producer request " + request.toString)
+    handleProducerRequest(request, "ProduceRequest")
+    if (logger.isDebugEnabled)
+      logger.debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
+    None
+  }
+
+  def handleMultiProducerRequest(receive: Receive): Option[Send] = {
+    val request = MultiProducerRequest.readFrom(receive.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Multiproducer request ")
+    request.produces.map(handleProducerRequest(_, "MultiProducerRequest"))
+    None
+  }
+
+  private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
+    val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
+    try {
+      logManager.getOrCreateLog(request.topic, partition).append(request.messages)
+      if(logger.isTraceEnabled)
+        logger.trace(request.messages.sizeInBytes + " bytes written to logs.")
+    }
+    catch {
+      case e =>
+        logger.error("error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
+        e match {
+          case _: IOException =>
+            logger.error("force shutdown due to " + e)
+            Runtime.getRuntime.halt(1)
+          case _ =>
+        }
+        throw e
+    }
+    None
+  }
+
+  def handleFetchRequest(request: Receive): Option[Send] = {
+    val fetchRequest = FetchRequest.readFrom(request.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Fetch request " + fetchRequest.toString)
+    Some(readMessageSet(fetchRequest))
+  }
+  
+  def handleMultiFetchRequest(request: Receive): Option[Send] = {
+    val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Multifetch request")
+    multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
+    var responses = multiFetchRequest.fetches.map(fetch =>
+        readMessageSet(fetch)).toList
+    
+    Some(new MultiMessageSetSend(responses))
+  }
+
+  private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
+    var  response: MessageSetSend = null
+    try {
+      logger.trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition)
+      val log = logManager.getOrCreateLog(fetchRequest.topic, fetchRequest.partition)
+      response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
+    }
+    catch {
+      case e =>
+        logger.error("error when processing request " + fetchRequest, e)
+        response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    response
+  }
+
+  def handleOffsetRequest(request: Receive): Option[Send] = {
+    val offsetRequest = OffsetRequest.readFrom(request.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Offset request " + offsetRequest.toString)
+    val log = logManager.getOrCreateLog(offsetRequest.topic, offsetRequest.partition)
+    val offsets = log.getOffsetsBefore(offsetRequest)
+    val response = new OffsetArraySend(offsets)
+    Some(response)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.reflect.BeanProperty
+import org.apache.log4j.Logger
+import kafka.log.LogManager
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.{Utils, SystemTime, KafkaScheduler}
+import kafka.network.{SocketServerStats, SocketServer}
+import java.io.File
+
+/**
+ * Represents the lifecycle of a single Kafka broker. Handles all functionality required
+ * to start up and shutdown a single Kafka node.
+ */
+class KafkaServer(val config: KafkaConfig) {
+  val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
+  private val isShuttingDown = new AtomicBoolean(false)
+  
+  private val logger = Logger.getLogger(classOf[KafkaServer])
+  private val shutdownLatch = new CountDownLatch(1)
+  private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
+  
+  var socketServer: SocketServer = null
+  
+  val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
+  
+  private var logManager: LogManager = null
+
+  /**
+   * Start up API for bringing up a single instance of the Kafka server.
+   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
+   */
+  def startup() {
+    try {
+      logger.info("Starting Kafka server...")
+      var needRecovery = true
+      val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+      if (cleanShutDownFile.exists) {
+        needRecovery = false
+        cleanShutDownFile.delete
+      }
+      logManager = new LogManager(config,
+                                  scheduler,
+                                  SystemTime,
+                                  1000L * 60 * config.logCleanupIntervalMinutes,
+                                  1000L * 60 * 60 * config.logRetentionHours,
+                                  needRecovery)
+                                                    
+      val handlers = new KafkaRequestHandlers(logManager)
+      socketServer = new SocketServer(config.port,
+                                      config.numThreads,
+                                      config.monitoringPeriodSecs,
+                                      handlers.handlerFor)
+      Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName))
+      socketServer.startup
+      /**
+       *  Registers this broker in ZK. After this, consumers can connect to broker.
+       *  So this should happen after socket server start.
+       */
+      logManager.startup
+      logger.info("Server started.")
+    }
+    catch {
+      case e =>
+        logger.fatal(e)
+        logger.fatal(Utils.stackTrace(e))
+        shutdown
+    }
+  }
+  
+  /**
+   * Shutdown API for shutting down a single instance of the Kafka server.
+   * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
+   */
+  def shutdown() {
+    val canShutdown = isShuttingDown.compareAndSet(false, true);
+    if (canShutdown) {
+      logger.info("Shutting down...")
+      try {
+        scheduler.shutdown
+        socketServer.shutdown()
+        Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))
+        logManager.close()
+
+        val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+        cleanShutDownFile.createNewFile
+      }
+      catch {
+        case e =>
+          logger.fatal(e)
+          logger.fatal(Utils.stackTrace(e))
+      }
+      shutdownLatch.countDown()
+      logger.info("shut down completed")
+    }
+  }
+  
+  /**
+   * After calling shutdown(), use this API to wait until the shutdown is complete
+   */
+  def awaitShutdown(): Unit = shutdownLatch.await()
+
+  def getLogManager(): LogManager = logManager
+
+  def getStats(): SocketServerStats = socketServer.stats
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.log4j.Logger
+import kafka.consumer.{Consumer, ConsumerConnector, ConsumerConfig}
+import kafka.utils.{SystemTime, Utils}
+import kafka.api.RequestKeys
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+
+class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: ConsumerConfig) {
+  private var server : KafkaServer = null
+  private var embeddedConsumer : EmbeddedConsumer = null
+
+  init
+
+  def this(serverConfig: KafkaConfig) = this(serverConfig, null)
+
+  private def init() {
+    server = new KafkaServer(serverConfig)
+    if (consumerConfig != null)
+      embeddedConsumer = new EmbeddedConsumer(consumerConfig, server)
+  }
+
+  def startup() {
+    server.startup
+    if (embeddedConsumer != null)
+      embeddedConsumer.startup
+  }
+
+  def shutdown() {
+    if (embeddedConsumer != null)
+      embeddedConsumer.shutdown
+    server.shutdown
+  }
+
+  def awaitShutdown() {
+    server.awaitShutdown
+  }
+}
+
+class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
+                       private val kafkaServer: KafkaServer) {
+  private val logger = Logger.getLogger(getClass())
+  private val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+  private val topicMessageStreams = consumerConnector.createMessageStreams(consumerConfig.embeddedConsumerTopicMap)
+
+  def startup() = {
+    var threadList = List[Thread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= Utils.newThread("kafka-embedded-consumer-" + topic + "-" + i, new Runnable() {
+          def run() {
+            logger.info("starting consumer thread " + i + " for topic " + topic)
+            val logManager = kafkaServer.getLogManager
+            val stats = kafkaServer.getStats
+            try {
+              for (message <- streamList(i)) {
+                val partition = logManager.chooseRandomPartition(topic)
+                val start = SystemTime.nanoseconds
+                logManager.getOrCreateLog(topic, partition).append(
+                                                          new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                          messages = message))
+                stats.recordRequest(RequestKeys.Produce, SystemTime.nanoseconds - start)
+              }
+            }
+            catch {
+              case e =>
+                logger.fatal(e + Utils.stackTrace(e))
+                logger.fatal(topic + " stream " + i + " unexpectedly exited")
+            }
+          }
+        }, false)
+
+    for (thread <- threadList)
+      thread.start
+  }
+
+  def shutdown() = {
+    consumerConnector.shutdown
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils._
+import org.apache.log4j.Logger
+import kafka.cluster.Broker
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.log.LogManager
+import java.net.InetAddress
+
+/**
+ * Handles the server's interaction with zookeeper. The server needs to register the following paths:
+ *   /topics/[topic]/[node_id-partition_num]
+ *   /brokers/[0...N] --> host:port
+ * 
+ */
+class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) {
+  
+  private val logger = Logger.getLogger(classOf[KafkaZooKeeper])
+  
+  val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
+  var zkClient: ZkClient = null
+  var topics: List[String] = Nil
+  val lock = new Object()
+  
+  def startup() {
+    /* start client */
+    logger.info("connecting to ZK: " + config.zkConnect)
+    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+    zkClient.subscribeStateChanges(new SessionExpireListener)
+  }
+
+  def registerBrokerInZk() {
+    logger.info("Registering broker " + brokerIdPath)
+    val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
+    val creatorId = hostName + "-" + System.currentTimeMillis
+    val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
+    ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+    logger.info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+  }
+
+  def registerTopicInZk(topic: String) {
+    registerTopicInZkInternal(topic)
+    lock synchronized {
+      topics ::= topic
+    }
+  }
+
+  def registerTopicInZkInternal(topic: String) {
+    val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + config.brokerId
+    val numParts = logManager.getTopicPartitionsMap.getOrElse(topic, config.numPartitions)
+    logger.info("Begin registering broker topic " + brokerTopicPath + " with " + numParts.toString + " partitions")
+    ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerTopicPath, numParts.toString)
+    logger.info("End registering broker topic " + brokerTopicPath)
+  }
+
+  /**
+   *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+   *  connection for us. We need to re-register this broker in the broker registry.
+   */
+  class SessionExpireListener() extends IZkStateListener {
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      logger.info("re-registering broker info in ZK for broker " + config.brokerId)
+      registerBrokerInZk()
+      lock synchronized {
+        logger.info("re-registering broker topics in ZK for broker " + config.brokerId)
+        for (topic <- topics)
+          registerTopicInZkInternal(topic)
+      }
+      logger.info("done re-registering broker")
+    }
+  }
+
+  def close() {
+    if (zkClient != null) {
+      logger.info("Closing zookeeper client...")
+      zkClient.close()
+    }
+  } 
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio._
+import java.nio.channels._
+import kafka.network._
+import kafka.message._
+import kafka.utils._
+import kafka.common.ErrorMapping
+
+/**
+ * A zero-copy message response that writes the bytes needed directly from the file
+ * wholly in kernel space
+ */
+@nonthreadsafe
+private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Int) extends Send {
+  
+  private var sent: Long = 0
+  private var size: Long = messages.sizeInBytes
+  private val header = ByteBuffer.allocate(6)
+  header.putInt(size.asInstanceOf[Int] + 2)
+  header.putShort(errorCode.asInstanceOf[Short])
+  header.rewind()
+  
+  var complete: Boolean = false
+
+  def this(messages: MessageSet) = this(messages, ErrorMapping.NoError)
+
+  def writeTo(channel: WritableByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    if(header.hasRemaining)
+      written += channel.write(header)
+    if(!header.hasRemaining) {
+      val fileBytesSent = messages.writeTo(channel, sent, size - sent)
+      written += fileBytesSent.asInstanceOf[Int]
+      sent += fileBytesSent
+    }
+
+    if(logger.isTraceEnabled)
+      if (channel.isInstanceOf[SocketChannel]) {
+        val socketChannel = channel.asInstanceOf[SocketChannel]
+        logger.trace(sent + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " expecting to send " + size + " bytes")
+      }
+
+    if(sent >= size)
+      complete = true
+    written
+  }
+  
+  def sendSize: Int = size.asInstanceOf[Int] + header.capacity
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio._
+import java.nio.channels._
+import kafka.network._
+import kafka.message._
+import kafka.utils._
+
+/**
+ * A set of message sets prefixed by size
+ */
+@nonthreadsafe
+private[server] class MultiMessageSetSend(val sets: List[MessageSetSend]) extends MultiSend(new ByteBufferSend(6) :: sets) {
+  
+  val buffer = this.sends.head.asInstanceOf[ByteBufferSend].buffer
+  val allMessageSetSize: Int = sets.foldLeft(0)(_ + _.sendSize)
+  val expectedBytesToWrite: Int = 4 + 2 + allMessageSetSize
+  buffer.putInt(2 + allMessageSetSize)
+  buffer.putShort(0)
+  buffer.rewind()
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/server/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/package.html Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+The kafka server.
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicLong
+import java.nio.channels.ClosedByInterruptException
+import joptsimple._
+import org.apache.log4j.Logger
+import kafka.utils.Utils
+import kafka.consumer.{ConsumerConfig, ConsumerConnector, Consumer}
+
+abstract class ShutdownableThread(name: String) extends Thread(name) {
+  def shutdown(): Unit  
+}
+
+/**
+ * Performance test for the full zookeeper consumer
+ */
+object ConsumerPerformance {
+  private val logger = Logger.getLogger(getClass())
+
+  def main(args: Array[String]): Unit = {
+    
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val consumerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the consumer properties.")
+                           .withRequiredArg
+                           .describedAs("properties")
+                           .ofType(classOf[String])
+    val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(10)
+    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
+                           .withRequiredArg
+                           .describedAs("size")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(100000)
+    val sleepSecsOpt = parser.accepts("sleep", "Initial interval to wait before connecting.")
+                           .withRequiredArg
+                           .describedAs("secs")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(5)
+    
+    val options = parser.parse(args : _*)
+    
+    for(arg <- List(topicOpt, consumerPropsOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"") 
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val numThreads = options.valueOf(numThreadsOpt).intValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val propsFile = options.valueOf(consumerPropsOpt)
+    val topic = options.valueOf(topicOpt)
+    val printInterval = options.valueOf(reportingIntervalOpt).intValue
+    val initialSleep = options.valueOf(sleepSecsOpt).intValue * 1000
+    
+    println("Starting consumer...")
+    var totalNumMsgs = new AtomicLong(0)
+    var totalNumBytes = new AtomicLong(0)
+    
+    val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+
+    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> numThreads))
+    var threadList = List[ShutdownableThread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= new ShutdownableThread("kafka-zk-consumer-" + i) {
+          private val shutdownLatch = new CountDownLatch(1)
+
+          def shutdown(): Unit = {
+            interrupt
+            shutdownLatch.await
+          }
+
+          override def run() {
+            var totalBytesRead = 0L
+            var nMessages = 0L
+            val startMs = System.currentTimeMillis
+
+            try {
+              for (message <- streamList(i)) {
+                nMessages += 1
+                totalBytesRead += message.payloadSize
+                if (nMessages % printInterval == 0) {
+                  val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0
+                  printMessage(totalBytesRead, nMessages, elapsedSecs)
+                }
+              }
+            }
+            catch {
+              case _: InterruptedException =>
+              case _: ClosedByInterruptException =>
+              case e => throw e
+            }
+            totalNumMsgs.addAndGet(nMessages)
+            totalNumBytes.addAndGet(totalBytesRead)
+            val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0
+            printMessage(totalBytesRead, nMessages, elapsedSecs)
+            shutdownComplete
+          }
+
+          private def printMessage(totalBytesRead: Long, nMessages: Long, elapsedSecs: Double) = {
+            logger.info("thread[" + i + "], nMsgs:" + nMessages + " bytes:" + totalBytesRead +
+              " nMsgs/sec:" + (nMessages / elapsedSecs).formatted("%.2f") +
+              " MB/sec:" + (totalBytesRead / elapsedSecs / (1024.0*1024.0)).formatted("%.2f"))
+
+          }
+          private def shutdownComplete() = shutdownLatch.countDown
+        }
+
+    logger.info("Sleeping for " + initialSleep / 1000 + " seconds.")
+    Thread.sleep(initialSleep)
+    logger.info("starting threads")
+    for (thread <- threadList)
+      thread.start
+
+    // attach shutdown handler to catch control-c
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run() = {
+        for (thread <- threadList)
+          thread.shutdown
+
+        try {
+          consumerConnector.shutdown
+        }
+        catch {
+          case _ =>
+        }
+        println("total nMsgs: " + totalNumMsgs)
+        println("totalBytesRead " + totalNumBytes)
+      }
+    });
+
+  }
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple._
+import kafka.utils.Utils
+import java.util.concurrent.CountDownLatch
+import org.apache.log4j.Logger
+import kafka.consumer._
+
+/**
+ * Program to read using the rich consumer and dump the results to standard out
+ */
+object ConsumerShell {
+  val logger = Logger.getLogger(getClass)
+  def main(args: Array[String]): Unit = {
+    
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val consumerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the consumer properties.")
+                           .withRequiredArg
+                           .describedAs("properties")
+                           .ofType(classOf[String])
+    val partitionsOpt = parser.accepts("partitions", "Number of partitions to consume from.")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1)
+    
+    val options = parser.parse(args : _*)
+    
+    for(arg <- List(topicOpt, consumerPropsOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"") 
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    
+    val partitions = options.valueOf(partitionsOpt).intValue
+    val propsFile = options.valueOf(consumerPropsOpt)
+    val topic = options.valueOf(topicOpt)
+    
+    println("Starting consumer...")
+
+    val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions))
+    var threadList = List[ZKConsumerThread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (stream <- streamList)
+        threadList ::= new ZKConsumerThread(stream)
+
+    for (thread <- threadList)
+      thread.start
+
+    // attach shutdown handler to catch control-c
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run() = {
+        consumerConnector.shutdown
+        threadList.foreach(_.shutdown)
+        println("consumer threads shutted down")        
+      }
+    })
+  }
+}
+
+class ZKConsumerThread(stream: KafkaMessageStream) extends Thread {
+  val shutdownLatch = new CountDownLatch(1)
+  val logger = Logger.getLogger(getClass)
+
+  override def run() {
+    println("Starting consumer thread..")
+    var count: Int = 0
+    try {
+      for (message <- stream) {
+        logger.debug("consumed: " + Utils.toString(message.payload, "UTF-8"))
+        count += 1
+      }
+    }catch {
+      case e:ConsumerTimeoutException => // this is ok
+      case oe: Exception => logger.error(oe)
+    }
+    shutdownLatch.countDown
+    println("Received " + count + " messages")
+    println("thread shutdown !" )
+  }
+
+  def shutdown() {
+    shutdownLatch.await
+  }          
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,71 @@
+/*
+* Copyright 2010 LinkedIn
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package kafka.tools
+
+import kafka.consumer._
+import joptsimple._
+import java.net.URI
+
+object GetOffsetShell {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+                           .withRequiredArg
+                           .describedAs("kafka://hostname:port")
+                           .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val partitionOpt = parser.accepts("partition", "partition id")
+                           .withRequiredArg
+                           .describedAs("partition id")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(0)
+    val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
+                           .withRequiredArg
+                           .describedAs("timestamp/-1(latest)/-2(earliest)")
+                           .ofType(classOf[java.lang.Long])
+    val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1)
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(urlOpt, topicOpt, timeOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val url = new URI(options.valueOf(urlOpt))
+    val topic = options.valueOf(topicOpt)
+    val partition = options.valueOf(partitionOpt).intValue
+    var time = options.valueOf(timeOpt).longValue
+    val nOffsets = options.valueOf(nOffsetsOpt).intValue
+    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
+    val offsets = consumer.getOffsetsBefore(topic, partition, time, nOffsets)
+    println("get " + offsets.length + " results")
+    for (offset <- offsets)
+      println(offset)
+  }
+}



Mime
View raw message