kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [18/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import java.net._
+import java.io._
+import java.nio._
+import java.nio.channels._
+
+import kafka.utils._
+
+import org.apache.log4j.Logger
+import kafka.api.RequestKeys
+
+/**
+ * An NIO socket server. The thread model is
+ *   1 Acceptor thread that handles new connections
+ *   N Processor threads that each have their own selectors and handle all requests from their connections synchronously
+ */
+private[kafka] class SocketServer(val port: Int,
+                   val numProcessorThreads: Int, 
+                   monitoringPeriodSecs: Int,
+                   private val handlerFactory: Handler.HandlerMapping) {
+ 
+  private val logger = Logger.getLogger(classOf[SocketServer])
+  private val time = SystemTime
+  private val processors = new Array[Processor](numProcessorThreads)
+  private var acceptor: Acceptor = new Acceptor(port, processors)
+  val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
+  
+  /**
+   * Start the socket server
+   */
+  def startup() {
+    for(i <- 0 until numProcessorThreads) {
+      processors(i) = new Processor(handlerFactory, time, stats)
+      Utils.newThread("kafka-processor-" + i, processors(i), false).start()
+    }
+    Utils.newThread("kafka-acceptor", acceptor, false).start()
+    acceptor.awaitStartup
+  }
+  
+  /**
+   * Shutdown the socket server
+   */
+  def shutdown() = {
+    acceptor.shutdown
+    for(processor <- processors)
+      processor.shutdown
+  }
+    
+}
+
+/**
+ * A base class with some helper variables and methods
+ */
+private[kafka] abstract class AbstractServerThread extends Runnable {
+  
+  protected val selector = Selector.open();
+  protected val logger = Logger.getLogger(getClass())
+  private val startupLatch = new CountDownLatch(1)
+  private val shutdownLatch = new CountDownLatch(1)
+  private val alive = new AtomicBoolean(false) 
+  
+  /**
+   * Initiates a graceful shutdown by signeling to stop and waiting for the shutdown to complete
+   */
+  def shutdown(): Unit = {
+    alive.set(false)
+    selector.wakeup
+    shutdownLatch.await
+  }
+  
+  /**
+   * Wait for the thread to completely start up
+   */
+  def awaitStartup(): Unit = startupLatch.await
+  
+  /**
+   * Record that the thread startup is complete
+   */
+  protected def startupComplete() = {
+    alive.set(true)  
+    startupLatch.countDown
+  }
+
+  /**
+   * Record that the thread shutdown is complete
+   */
+  protected def shutdownComplete() = shutdownLatch.countDown
+  
+  /**
+   * Is the server still running?
+   */
+  protected def isRunning = alive.get
+  
+}
+
+/**
+ * Thread that accepts and configures new connections. There is only need for one of these
+ */
+private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+  
+  /**
+   * Accept loop that checks for new connection attempts
+   */
+  def run() {	 
+    val serverChannel = ServerSocketChannel.open()
+	  serverChannel.configureBlocking(false)
+	  serverChannel.socket.bind(new InetSocketAddress(port))
+	  serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+ 	  logger.info("Awaiting connections on port " + port)
+    startupComplete()
+	
+	  var currentProcessor = 0
+    while(isRunning) {
+      val ready = selector.select(500)
+      if(ready > 0) {
+  	    val keys = selector.selectedKeys()
+  	    val iter = keys.iterator()
+  	    while(iter.hasNext && isRunning) {
+  	      var key: SelectionKey = null
+  	      try {
+  	        key = iter.next
+  	        iter.remove()
+  	      
+  	        if(key.isAcceptable)
+                accept(key, processors(currentProcessor))
+              else
+                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
+         
+              // round robin to the next processor thread
+              currentProcessor = (currentProcessor + 1) % processors.length
+  	      } catch {
+  	        case e: Throwable => logger.error("Error in acceptor", e)
+  	      }
+        }
+      }
+    }
+    logger.debug("Closing server socket and selector.")
+    Utils.swallow(logger.error, serverChannel.close())
+    Utils.swallow(logger.error, selector.close())
+    shutdownComplete()
+  }
+  
+  /*
+   * Accept a new connection
+   */
+  def accept(key: SelectionKey, processor: Processor) {
+    val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
+    if(logger.isDebugEnabled)
+      logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
+    socketChannel.configureBlocking(false)
+	  socketChannel.socket().setTcpNoDelay(true)
+    processor.accept(socketChannel)
+  }
+  
+}
+
+/**
+ * Thread that processes all requests from a single connection. There are N of these running in parallel
+ * each of which has its own selectors
+ */
+private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
+                val time: Time, 
+                val stats: SocketServerStats) extends AbstractServerThread {
+  
+  private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
+  private val requestLogger = Logger.getLogger("kafka.request.logger")
+
+  override def run() {
+    startupComplete()
+    while(isRunning) {
+      // setup any new connections that have been queued up
+      configureNewConnections()
+      
+      val ready = selector.select(500)
+      if(ready > 0) {
+		    val keys = selector.selectedKeys()
+		    val iter = keys.iterator()
+		    while(iter.hasNext && isRunning) {
+		      var key: SelectionKey = null
+		      try {
+		        key = iter.next
+		        iter.remove()
+		      
+		        if(key.isReadable)
+		          read(key)
+            else if(key.isWritable)
+              write(key)
+            else if(!key.isValid)
+              close(key)
+            else
+              throw new IllegalStateException("Unrecognized key state for processor thread.")
+		      } catch {
+		      	case e: EOFException => {
+		      		logger.info("Closing socket for " + channelFor(key).socket.getInetAddress + ".")
+		      		close(key)
+		      	} case e: Throwable => {
+              logger.info("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error")
+              logger.error(e, e)
+              close(key)
+            }
+          }
+        }
+      }
+    }
+    logger.debug("Closing selector.")
+    Utils.swallow(logger.info, selector.close())
+    shutdownComplete()
+  }
+  
+  private def close(key: SelectionKey) {
+    val channel = key.channel.asInstanceOf[SocketChannel]
+    if(logger.isDebugEnabled)
+      logger.debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
+    Utils.swallow(logger.info, channel.socket().close())
+    Utils.swallow(logger.info, channel.close())
+    key.attach(null)
+    Utils.swallow(logger.info, key.cancel())
+  }
+  
+  /**
+   * Queue up a new connection for reading
+   */
+  def accept(socketChannel: SocketChannel) {
+    newConnections.add(socketChannel)
+    selector.wakeup()
+  }
+  
+  /**
+   * Register any new connections that have been queued up
+   */
+  private def configureNewConnections() {
+    while(newConnections.size() > 0) {
+      val channel = newConnections.poll()
+      if(logger.isDebugEnabled())
+        logger.debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
+      channel.register(selector, SelectionKey.OP_READ)
+    }
+  }
+  
+  /**
+   * Handle a completed request producing an optional response
+   */
+  private def handle(key: SelectionKey, request: Receive): Option[Send] = {
+    val requestTypeId = request.buffer.getShort()
+    if(requestLogger.isTraceEnabled) {
+      requestTypeId match {
+        case RequestKeys.Produce =>
+          requestLogger.trace("Handling produce request from " + channelFor(key).socket.getRemoteSocketAddress())
+        case RequestKeys.Fetch =>
+          requestLogger.trace("Handling fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
+        case RequestKeys.MultiFetch =>
+          requestLogger.trace("Handling multi-fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
+        case RequestKeys.MultiProduce =>
+          requestLogger.trace("Handling multi-produce request from " + channelFor(key).socket.getRemoteSocketAddress())
+        case RequestKeys.Offsets =>
+          requestLogger.trace("Handling offset request from " + channelFor(key).socket.getRemoteSocketAddress())
+        case _ => throw new InvalidRequestException("No mapping found for handler id " + requestTypeId)
+      }
+    }
+    val handler = handlerMapping(requestTypeId, request)
+    if(handler == null)
+      throw new InvalidRequestException("No handler found for request")
+    val start = time.nanoseconds
+    val maybeSend = handler(request)
+    stats.recordRequest(requestTypeId, time.nanoseconds - start)
+    maybeSend
+  }
+  
+  /*
+   * Process reads from ready sockets
+   */
+  def read(key: SelectionKey) {
+    val socketChannel = channelFor(key)
+    var request = key.attachment.asInstanceOf[Receive]
+    if(key.attachment == null) {
+      request = new BoundedByteBufferReceive()
+      key.attach(request)
+    }
+    val read = request.readFrom(socketChannel)
+    stats.recordBytesRead(read)
+    if(logger.isTraceEnabled)
+      logger.trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+    if(read < 0) {
+      close(key)
+      return
+    } else if(request.complete) {
+      val maybeResponse = handle(key, request)
+      key.attach(null)
+      // if there is a response, send it, otherwise do nothing
+      if(maybeResponse.isDefined) {
+        key.attach(maybeResponse.getOrElse(None))
+        key.interestOps(SelectionKey.OP_WRITE)
+      }
+    } else {
+      // more reading to be done
+      key.interestOps(SelectionKey.OP_READ)
+      selector.wakeup()
+    }
+  }
+  
+  /*
+   * Process writes to ready sockets
+   */
+  def write(key: SelectionKey) {
+    val response = key.attachment().asInstanceOf[Send]
+    val socketChannel = channelFor(key)
+    val written = response.writeTo(socketChannel)
+    stats.recordBytesWritten(written)
+    if(logger.isTraceEnabled)
+      logger.trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+    if(response.complete) {
+      key.attach(null)
+      key.interestOps(SelectionKey.OP_READ)
+    } else {
+      key.interestOps(SelectionKey.OP_WRITE)
+      selector.wakeup()
+    }
+  }
+  
+  private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.concurrent.atomic._
+import javax.management._
+import kafka.utils._
+import kafka.api.RequestKeys
+
+trait SocketServerStatsMBean {
+  def getProduceRequestsPerSecond: Double
+  def getFetchRequestsPerSecond: Double
+  def getAvgProduceRequestMs: Double
+  def getMaxProduceRequestMs: Double
+  def getAvgFetchRequestMs: Double
+  def getMaxFetchRequestMs: Double
+  def getBytesReadPerSecond: Double
+  def getBytesWrittenPerSecond: Double
+  def getNumFetchRequests: Long
+  def getNumProduceRequests: Long
+}
+
+@threadsafe
+class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends SocketServerStatsMBean {
+  
+  def this(monitorDurationNs: Long) = this(monitorDurationNs, SystemTime)
+  val produceTimeStats = new SnapshotStats(monitorDurationNs)
+  val fetchTimeStats = new SnapshotStats(monitorDurationNs)
+  val produceBytesStats = new SnapshotStats(monitorDurationNs)
+  val fetchBytesStats = new SnapshotStats(monitorDurationNs)
+
+  def recordRequest(requestTypeId: Short, durationNs: Long) {
+    requestTypeId match {
+      case r if r == RequestKeys.Produce || r == RequestKeys.MultiProduce =>
+        produceTimeStats.recordRequestMetric(durationNs)
+      case r if r == RequestKeys.Fetch || r == RequestKeys.MultiFetch =>
+        fetchTimeStats.recordRequestMetric(durationNs)
+      case _ => /* not collecting; let go */
+    }
+  }
+  
+  def recordBytesWritten(bytes: Int): Unit = fetchBytesStats.recordRequestMetric(bytes)
+
+  def recordBytesRead(bytes: Int): Unit = produceBytesStats.recordRequestMetric(bytes)
+
+  def getProduceRequestsPerSecond: Double = produceTimeStats.getRequestsPerSecond
+  
+  def getFetchRequestsPerSecond: Double = fetchTimeStats.getRequestsPerSecond
+
+  def getAvgProduceRequestMs: Double = produceTimeStats.getAvgMetric / (1000.0 * 1000.0)
+  
+  def getMaxProduceRequestMs: Double = produceTimeStats.getMaxMetric / (1000.0 * 1000.0)
+
+  def getAvgFetchRequestMs: Double = fetchTimeStats.getAvgMetric / (1000.0 * 1000.0)
+
+  def getMaxFetchRequestMs: Double = fetchTimeStats.getMaxMetric / (1000.0 * 1000.0)
+
+  def getBytesReadPerSecond: Double = produceBytesStats.getAvgMetric
+  
+  def getBytesWrittenPerSecond: Double = fetchBytesStats.getAvgMetric
+
+  def getNumFetchRequests: Long = fetchTimeStats.getNumRequests
+
+  def getNumProduceRequests: Long = produceTimeStats.getNumRequests
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import org.apache.log4j.Logger
+
+/**
+ * Represents a stateful transfer of data to or from the network
+ */
+private[network] trait Transmission {
+  
+  protected val logger: Logger = Logger.getLogger(getClass())
+  
+  def complete: Boolean
+  
+  protected def expectIncomplete(): Unit = {
+    if(complete)
+      throw new IllegalStateException("This operation cannot be completed on a complete request.")
+  }
+  
+  protected def expectComplete(): Unit = {
+    if(!complete)
+      throw new IllegalStateException("This operation cannot be completed on an incomplete request.")
+  }
+  
+}
+
+/**
+ * A transmission that is being received from a channel
+ */
+private[kafka] trait Receive extends Transmission {
+  
+  def buffer: ByteBuffer
+  
+  def readFrom(channel: ReadableByteChannel): Int
+  
+  def readCompletely(channel: ReadableByteChannel): Int = {
+    var read = 0
+    while(!complete) {
+      read = readFrom(channel)
+      if(logger.isTraceEnabled)
+        logger.trace(read + " bytes read.")
+    }
+    read
+  }
+  
+}
+
+/**
+ * A transmission that is being sent out to the channel
+ */
+private[kafka] trait Send extends Transmission {
+    
+  def writeTo(channel: WritableByteChannel): Int
+  
+  def writeCompletely(channel: WritableByteChannel): Int = {
+    var written = 0
+    while(!complete) {
+      written = writeTo(channel)
+      if(logger.isTraceEnabled)
+        logger.trace(written + " bytes written.")
+    }
+    written
+  }
+    
+}
+
+/**
+ * A set of composite sends, sent one after another
+ */
+abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
+  val expectedBytesToWrite: Int
+  private var current = sends
+  var totalWritten = 0
+
+  def writeTo(channel: WritableByteChannel): Int = {
+	  expectIncomplete
+    val written = current.head.writeTo(channel)
+    totalWritten += written
+    if(current.head.complete)
+      current = current.tail
+    written
+  }
+  
+  def complete: Boolean = {
+    if (current == Nil) {
+      if (totalWritten != expectedBytesToWrite)
+        logger.error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
+      return true
+    }
+    else
+      return false
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html Mon Aug  1 23:41:24 2011
@@ -0,0 +1,11 @@
+The network server for kafka. Now application specific code here, just general network server stuff.
+<br>
+The classes Receive and Send encapsulate the incoming and outgoing transmission of bytes. A Handler
+is a mapping between a Receive and a Send, and represents the users hook to add logic for mapping requests
+to actual processing code. Any uncaught exceptions in the reading or writing of the transmissions will result in 
+the server logging an error and closing the offending socket. As a result it is the duty of the Handler
+implementation to catch and serialize any application-level errors that should be sent to the client.
+<br>
+This slightly lower-level interface that models sending and receiving rather than requests and responses
+is necessary in order to allow the send or receive to be overridden with a non-user-space writing of bytes
+using FileChannel.transferTo.
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+import collection.mutable.Map
+import collection.SortedSet
+import kafka.cluster.{Broker, Partition}
+
+trait BrokerPartitionInfo {
+  /**
+   * Return a sequence of (brokerId, numPartitions).
+   * @param topic the topic for which this information is to be returned
+   * @return a sequence of (brokerId, numPartitions). Returns a zero-length
+   * sequence if no brokers are available.
+   */  
+  def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition]
+
+  /**
+   * Generate the host and port information for the broker identified
+   * by the given broker id 
+   * @param brokerId the broker for which the info is to be returned
+   * @return host and port of brokerId
+   */
+  def getBrokerInfo(brokerId: Int): Option[Broker]
+
+  /**
+   * Generate a mapping from broker id to the host and port for all brokers
+   * @return mapping from id to host and port of all brokers
+   */
+  def getAllBrokerInfo: Map[Int, Broker]
+
+  /**
+   * Cleanup
+   */
+  def close
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+import collection.mutable.HashMap
+import collection.mutable.Map
+import org.apache.log4j.Logger
+import collection.SortedSet
+import kafka.cluster.{Broker, Partition}
+import kafka.common.InvalidConfigException
+
+private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo {
+  private val logger = Logger.getLogger(classOf[ConfigBrokerPartitionInfo])
+  private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo
+  private val allBrokers = getConfigBrokerInfo
+
+  /**
+   * Return a sequence of (brokerId, numPartitions)
+   * @param topic this value is null 
+   * @return a sequence of (brokerId, numPartitions)
+   */
+  def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = brokerPartitions
+
+  /**
+   * Generate the host and port information for the broker identified
+   * by the given broker id
+   * @param brokerId the broker for which the info is to be returned
+   * @return host and port of brokerId
+   */
+  def getBrokerInfo(brokerId: Int): Option[Broker] = {
+    allBrokers.get(brokerId)
+  }
+
+  /**
+   * Generate a mapping from broker id to the host and port for all brokers
+   * @return mapping from id to host and port of all brokers
+   */
+  def getAllBrokerInfo: Map[Int, Broker] = allBrokers
+
+  def close {}
+
+  /**
+   * Generate a sequence of (brokerId, numPartitions) for all brokers
+   * specified in the producer configuration
+   * @return sequence of (brokerId, numPartitions)
+   */
+  private def getConfigTopicPartitionInfo(): SortedSet[Partition] = {
+    val brokerInfoList = config.brokerPartitionInfo.split(",")
+    if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty")
+    // check if each individual broker info is valid => (brokerId: brokerHost: brokerPort)
+    brokerInfoList.foreach { bInfo =>
+      val brokerInfo = bInfo.split(":")
+      if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value")
+    }
+    val brokerPartitions = brokerInfoList.map(bInfo => (bInfo.split(":").head.toInt, 1))
+    var brokerParts = SortedSet.empty[Partition]
+    brokerPartitions.foreach { bp =>
+      for(i <- 0 until bp._2) {
+        val bidPid = new Partition(bp._1, i)
+        brokerParts = brokerParts + bidPid
+      }
+    }
+    brokerParts
+  }
+
+  /**
+   * Generate the host and port information for for all brokers
+   * specified in the producer configuration
+   * @return mapping from brokerId to (host, port) for all brokers
+   */
+  private def getConfigBrokerInfo(): Map[Int, Broker] = {
+    val brokerInfo = new HashMap[Int, Broker]()
+    val brokerInfoList = config.brokerPartitionInfo.split(",")
+    brokerInfoList.foreach{ bInfo =>
+      val brokerIdHostPort = bInfo.split(":")
+      brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
+        brokerIdHostPort(1), brokerIdHostPort(2).toInt))
+    }
+    brokerInfo
+  }
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
+  private val random = new java.util.Random
+  
+  def partition(key: T, numPartitions: Int): Int = {
+    if(key == null)
+      random.nextInt(numPartitions)
+    else
+      key.hashCode % numPartitions
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.MissingConfigException
+import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.{Logger, AppenderSkeleton}
+import kafka.utils.Utils
+import kafka.serializer.Encoder
+import java.util.{Properties, Date}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class KafkaLog4jAppender extends AppenderSkeleton {
+  var port:Int = 0
+  var host:String = null
+  var topic:String = null
+  var encoderClass:String = null
+  
+  private var producer:SyncProducer = null
+  private val logger = Logger.getLogger(classOf[KafkaLog4jAppender])
+  private var encoder: Encoder[AnyRef] = null
+  
+  def getPort:Int = port
+  def setPort(port: Int) = { this.port = port }
+
+  def getHost:String = host
+  def setHost(host: String) = { this.host = host }
+
+  def getTopic:String = topic
+  def setTopic(topic: String) = { this.topic = topic }
+
+  def getEncoder:String = encoderClass
+  def setEncoder(encoder: String) = { this.encoderClass = encoder }
+  
+  override def activateOptions = {
+    // check for config parameter validity
+    if(host == null)
+      throw new MissingConfigException("Broker Host must be specified by the Kafka log4j appender")
+    if(port == 0)
+      throw new MissingConfigException("Broker Port must be specified by the Kafka log4j appender") 
+    if(topic == null)
+      throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
+    if(encoderClass == null) {
+      logger.info("Using default encoder - kafka.producer.DefaultStringEncoder")
+      encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
+    }else // instantiate the encoder, if present
+      encoder = Utils.getObject(encoderClass)
+    val props = new Properties()
+    props.put("host", host)
+    props.put("port", port.toString)
+    producer = new SyncProducer(new SyncProducerConfig(props))
+    logger.info("Kafka producer connected to " + host + "," + port)
+    logger.info("Logging for topic: " + topic)
+  }
+  
+  override def append(event: LoggingEvent) = {
+    if (logger.isDebugEnabled){
+      logger.debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage +
+            " for " + host + "," + port)
+    }
+    val message = encoder.toMessage(event)
+    producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message))
+  }
+
+  override def close = {
+    if(!this.closed) {
+      this.closed = true
+      producer.close
+    }
+  }
+
+  override def requiresLayout: Boolean = false
+}
+
+class DefaultStringEncoder extends Encoder[LoggingEvent] {
+  override def toMessage(event: LoggingEvent):Message = new Message(event.getMessage.asInstanceOf[String].getBytes)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+trait Partitioner[T] {
+  /**
+   * Uses the key to calculate a partition bucket id for routing
+   * the data to the appropriate broker partition
+   * @return an integer between 0 and numPartitions-1
+   */
+  def partition(key: T, numPartitions: Int): Int
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.{CallbackHandler, EventHandler}
+import org.apache.log4j.Logger
+import kafka.serializer.Encoder
+import kafka.utils._
+import java.util.Properties
+import kafka.cluster.{Partition, Broker}
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.api.ProducerRequest
+import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+
+class Producer[K,V](config: ProducerConfig,
+                    partitioner: Partitioner[K],
+                    producerPool: ProducerPool[V],
+                    populateProducerPool: Boolean,
+                    private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
+                                                          /* use the other constructor*/
+{
+  private val logger = Logger.getLogger(classOf[Producer[K, V]])
+  private val hasShutdown = new AtomicBoolean(false)
+  if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
+    throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+  private val random = new java.util.Random
+  // check if zookeeper based auto partition discovery is enabled
+  private val zkEnabled = Utils.propertyExists(config.zkConnect)
+  if(brokerPartitionInfo == null) {
+    zkEnabled match {
+      case true =>
+        val zkProps = new Properties()
+        zkProps.put("zk.connect", config.zkConnect)
+        zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
+        zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
+        zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
+        brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
+      case false =>
+        brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
+    }
+  }
+  // pool of producers, one per broker
+  if(populateProducerPool) {
+    val allBrokers = brokerPartitionInfo.getAllBrokerInfo
+    allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
+  }
+
+/**
+ * This constructor can be used when all config parameters will be specified through the
+ * ProducerConfig object
+ * @param config Producer Configuration object
+ */
+  def this(config: ProducerConfig) =  this(config, Utils.getObject(config.partitionerClass),
+    new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null)
+
+  /**
+   * This constructor can be used to provide pre-instantiated objects for all config parameters
+   * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
+   * callback handler. If you use this constructor, encoder, eventHandler, callback handler and partitioner
+   * will not be picked up from the config.
+   * @param config Producer Configuration object
+   * @param encoder Encoder used to convert an object of type V to a kafka.message.Message. If this is null it
+   * throws an InvalidConfigException
+   * @param eventHandler the class that implements kafka.producer.async.IEventHandler[T] used to
+   * dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer. If this is null, it
+   * uses the DefaultEventHandler
+   * @param cbkHandler the class that implements kafka.producer.async.CallbackHandler[T] used to inject
+   * callbacks at various stages of the kafka.producer.AsyncProducer pipeline. If this is null, the producer does
+   * not use the callback handler and hence does not invoke any callbacks
+   * @param partitioner class that implements the kafka.producer.Partitioner[K], used to supply a custom
+   * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
+   * object in the  send API. If this is null, producer uses DefaultPartitioner
+   */
+  def this(config: ProducerConfig,
+           encoder: Encoder[V],
+           eventHandler: EventHandler[V],
+           cbkHandler: CallbackHandler[V],
+           partitioner: Partitioner[K]) =
+    this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
+         new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
+  /**
+   * Sends the data, partitioned by key to the topic using either the
+   * synchronous or the asynchronous producer
+   * @param producerData the producer data object that encapsulates the topic, key and message data
+   */
+  def send(producerData: ProducerData[K,V]*) {
+    val producerPoolRequests = producerData.map { pd =>
+    // find the number of broker partitions registered for this topic
+      logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
+      val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+      logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
+      val totalNumPartitions = numBrokerPartitions.length
+      if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+
+      var brokerIdPartition: Partition = null
+      var partition: Int = 0
+      if(zkEnabled) {
+        // get the partition id
+        val partitionId = getPartition(pd.getKey, totalNumPartitions)
+        brokerIdPartition = numBrokerPartitions(partitionId)
+        val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
+        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+                " on partition " + brokerIdPartition.partId)
+        partition = brokerIdPartition.partId
+      }else {
+        // randomly select a broker
+        val randomBrokerId = random.nextInt(totalNumPartitions)
+        brokerIdPartition = numBrokerPartitions(randomBrokerId)
+        val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
+
+        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+                " on a randomly chosen partition")
+        partition = ProducerRequest.RandomPartition
+      }
+      producerPool.getProducerPoolData(pd.getTopic,
+                                       new Partition(brokerIdPartition.brokerId, partition),
+                                       pd.getData)
+    }
+    producerPool.send(producerPoolRequests: _*)
+  }
+
+  /**
+   * Retrieves the partition id and throws an InvalidPartitionException if
+   * the value of partition is not between 0 and numPartitions-1
+   * @param key the partition key
+   * @param numPartitions the total number of available partitions
+   * @returns the partition id
+   */
+  private def getPartition(key: K, numPartitions: Int): Int = {
+    if(numPartitions <= 0)
+      throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
+              "\n Valid values are > 0")
+    val partition = if(key == null) random.nextInt(numPartitions)
+                    else partitioner.partition(key , numPartitions)
+    if(partition < 0 || partition >= numPartitions)
+      throw new InvalidPartitionException("Invalid partition id : " + partition +
+              "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+    partition
+  }
+  
+  /**
+   * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
+   * on registration of new broker in zookeeper
+   * @param bid the id of the broker
+   * @param host the hostname of the broker
+   * @param port the port of the broker
+   */
+  private def producerCbk(bid: Int, host: String, port: Int) =  {
+    if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
+    else logger.debug("Skipping the callback since populateProducerPool = false")
+  }
+
+  /**
+   * Close API to close the producer pool connections to all Kafka brokers. Also closes
+   * the zookeeper client connection if one exists
+   */
+  def close() = {
+    val canShutdown = hasShutdown.compareAndSet(false, true)
+    if(canShutdown) {
+      producerPool.close
+      brokerPartitionInfo.close
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.AsyncProducerConfigShared
+import java.util.Properties
+import kafka.utils.{ZKConfig, Utils}
+import kafka.common.InvalidConfigException
+
+class ProducerConfig(val props: Properties) extends ZKConfig(props) 
+        with AsyncProducerConfigShared with SyncProducerConfigShared{
+
+  /** For bypassing zookeeper based auto partition discovery, use this config   *
+   *  to pass in static broker and per-broker partition information. Format-    *
+   *  brokerid1:host1:port1, brokerid2:host2:port2*/
+  val brokerPartitionInfo = Utils.getString(props, "broker.list", null)
+  if(brokerPartitionInfo != null && Utils.getString(props, "partitioner.class", null) != null)
+    throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
+
+  /** the partitioner class for partitioning events amongst sub-topics */
+  val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
+
+  /** this parameter specifies whether the messages are sent asynchronously *
+   * or not. Valid values are - async for asynchronous send                 *
+   *                            sync for synchronous send                   */
+  val producerType = Utils.getString(props, "producer.type", "sync")
+
+  /**
+   * This parameter allows you to specify the compression codec for all data generated *
+   * by this producer. The default is NoCompressionCodec
+   */
+  val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
+
+  /** This parameter allows you to set whether compression should be turned *
+   *  on for particular topics
+   *
+   *  If the compression codec is anything other than NoCompressionCodec,
+   *
+   *    Enable compression only for specified topics if any
+   *
+   *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
+   *
+   *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+   */
+  val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+/**
+ * Represents the data to be sent using the Producer send API
+ * @param topic the topic under which the message is to be published
+ * @param key the key used by the partitioner to pick a broker partition
+ * @param data variable length data to be published as Kafka messages under topic
+ */
+class ProducerData[K, V](private val topic: String,
+                         private val key: K,
+                         private val data: Seq[V]) {
+
+  def this(t: String, d: Seq[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
+
+  def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d))
+
+  def getTopic: String = topic
+
+  def getKey: K = key
+
+  def getData: Seq[V] = data
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async._
+import java.util.Properties
+import kafka.serializer.Encoder
+import org.apache.log4j.Logger
+import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
+import kafka.cluster.{Partition, Broker}
+import kafka.api.ProducerRequest
+import kafka.common.{UnavailableProducerException, InvalidConfigException}
+import kafka.utils.Utils
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+
+class ProducerPool[V](private val config: ProducerConfig,
+                      private val serializer: Encoder[V],
+                      private val syncProducers: ConcurrentMap[Int, SyncProducer],
+                      private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]],
+                      private val inputEventHandler: EventHandler[V] = null,
+                      private val cbkHandler: CallbackHandler[V] = null) {
+
+  private val logger = Logger.getLogger(classOf[ProducerPool[V]])
+  private var eventHandler = inputEventHandler
+  if(eventHandler == null)
+    eventHandler = new DefaultEventHandler(config, cbkHandler)
+
+  if(serializer == null)
+    throw new InvalidConfigException("serializer passed in is null!")
+
+  private var sync: Boolean = true
+  config.producerType match {
+    case "sync" =>
+    case "async" => sync = false
+    case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
+  }
+
+  def this(config: ProducerConfig, serializer: Encoder[V],
+           eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V]) =
+    this(config, serializer,
+         new ConcurrentHashMap[Int, SyncProducer](),
+         new ConcurrentHashMap[Int, AsyncProducer[V]](),
+         eventHandler, cbkHandler)
+
+  def this(config: ProducerConfig, serializer: Encoder[V]) = this(config, serializer,
+                                                                  new ConcurrentHashMap[Int, SyncProducer](),
+                                                                  new ConcurrentHashMap[Int, AsyncProducer[V]](),
+                                                                  Utils.getObject(config.eventHandler),
+                                                                  Utils.getObject(config.cbkHandler))
+  /**
+   * add a new producer, either synchronous or asynchronous, connecting
+   * to the specified broker 
+   * @param bid the id of the broker
+   * @param host the hostname of the broker
+   * @param port the port of the broker
+   */
+  def addProducer(broker: Broker) {
+    if(sync) {
+        val props = new Properties()
+        props.put("host", broker.host)
+        props.put("port", broker.port.toString)
+        props.put("buffer.size", config.bufferSize.toString)
+        props.put("connect.timeout.ms", config.connectTimeoutMs.toString)
+        props.put("reconnect.interval", config.reconnectInterval.toString)
+        val producer = new SyncProducer(new SyncProducerConfig(props))
+        logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+        syncProducers.put(broker.id, producer)
+    } else {
+        val props = new Properties()
+        props.put("host", broker.host)
+        props.put("port", broker.port.toString)
+        props.put("queue.time", config.queueTime.toString)
+        props.put("queue.size", config.queueSize.toString)
+        props.put("batch.size", config.batchSize.toString)
+        props.put("serializer.class", config.serializerClass)
+        val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
+                                            new SyncProducer(new SyncProducerConfig(props)),
+                                            serializer,
+                                            eventHandler, config.eventHandlerProps,
+                                            cbkHandler, config.cbkHandlerProps)
+        producer.start
+        logger.info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+        asyncProducers.put(broker.id, producer)
+    }
+  }
+
+  /**
+   * selects either a synchronous or an asynchronous producer, for
+   * the specified broker id and calls the send API on the selected
+   * producer to publish the data to the specified broker partition
+   * @param poolData the producer pool request object
+   */
+  def send(poolData: ProducerPoolData[V]*) {
+    val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct
+    var remainingRequests = poolData.toSeq
+    distinctBrokers.foreach { bid =>
+      val requestsForThisBid = remainingRequests partition (_.getBidPid.brokerId == bid)
+      remainingRequests = requestsForThisBid._2
+
+      if(sync) {
+        val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
+          new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
+                                   messages = req.getData.map(d => serializer.toMessage(d)): _*)))
+        logger.debug("Fetching sync producer for broker id: " + bid)
+        val producer = syncProducers.get(bid)
+        if(producer != null) {
+          if(producerRequests.size > 1)
+            producer.multiSend(producerRequests.toArray)
+          else
+            producer.send(topic = producerRequests(0).topic,
+                          partition = producerRequests(0).partition,
+                          messages = producerRequests(0).messages)
+          config.compressionCodec match {
+            case NoCompressionCodec => logger.debug("Sending message to broker " + bid)
+            case _ => logger.debug("Sending compressed messages to broker " + bid)
+          }
+        }else
+          throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
+            "Sync Producer for broker " + bid + " does not exist in the pool")
+      }else {
+        logger.debug("Fetching async producer for broker id: " + bid)
+        val producer = asyncProducers.get(bid)
+        if(producer != null) {
+          requestsForThisBid._1.foreach { req =>
+            req.getData.foreach(d => producer.send(req.getTopic, d, req.getBidPid.partId))
+          }
+          if(logger.isDebugEnabled)
+            config.compressionCodec match {
+              case NoCompressionCodec => logger.debug("Sending message")
+              case _ => logger.debug("Sending compressed messages")
+            }
+        }
+        else
+          throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
+            "Async Producer for broker " + bid + " does not exist in the pool")
+      }
+    }
+  }
+
+  /**
+   * Closes all the producers in the pool
+   */
+  def close() = {
+    config.producerType match {
+      case "sync" =>
+        logger.info("Closing all sync producers")
+        val iter = syncProducers.values.iterator
+        while(iter.hasNext)
+          iter.next.close
+      case "async" =>
+        logger.info("Closing all async producers")
+        val iter = asyncProducers.values.iterator
+        while(iter.hasNext)
+          iter.next.close
+    }
+  }
+
+  /**
+   * This constructs and returns the request object for the producer pool
+   * @param topic the topic to which the data should be published
+   * @param bidPid the broker id and partition id
+   * @param data the data to be published
+   */
+  def getProducerPoolData(topic: String, bidPid: Partition, data: Seq[V]): ProducerPoolData[V] = {
+    new ProducerPoolData[V](topic, bidPid, data)
+  }
+
+  class ProducerPoolData[V](topic: String,
+                            bidPid: Partition,
+                            data: Seq[V]) {
+    def getTopic: String = topic
+    def getBidPid: Partition = bidPid
+    def getData: Seq[V] = data
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import java.net._
+import java.nio.channels._
+import kafka.message._
+import kafka.network._
+import kafka.utils._
+import kafka.api._
+import scala.math._
+import org.apache.log4j.{Level, Logger}
+import kafka.common.MessageSizeTooLargeException
+import java.nio.ByteBuffer
+
+object SyncProducer {
+  val RequestKey: Short = 0
+}
+
+/*
+ * Send a message set.
+ */
+@threadsafe
+class SyncProducer(val config: SyncProducerConfig) {
+  
+  private val logger = Logger.getLogger(getClass())
+  private val MaxConnectBackoffMs = 60000
+  private var channel : SocketChannel = null
+  private var sentOnConnection = 0
+  private val lock = new Object()
+  @volatile
+  private var shutdown: Boolean = false
+
+  logger.debug("Instantiating Scala Sync Producer")
+
+  private def verifySendBuffer(buffer : ByteBuffer) = {
+    if (logger.isTraceEnabled) {
+      logger.trace("verifying sendbuffer of size " + buffer.limit)
+      val requestTypeId = buffer.getShort()
+      if (requestTypeId == RequestKeys.MultiProduce) {
+        try {
+          val request = MultiProducerRequest.readFrom(buffer)
+          for (produce <- request.produces) {
+            try {
+              for (messageAndOffset <- produce.messages)
+                if (!messageAndOffset.message.isValid)
+                  logger.trace("topic " + produce.topic + " is invalid")
+            }
+            catch {
+              case e: Throwable =>
+              logger.trace("error iterating messages " + e + Utils.stackTrace(e))
+            }
+          }
+        }
+        catch {
+          case e: Throwable =>
+            logger.trace("error verifying sendbuffer " + e + Utils.stackTrace(e))
+        }
+      }
+    }
+  }
+
+  /**
+   * Common functionality for the public send methods
+   */
+  private def send(send: BoundedByteBufferSend) {
+    lock synchronized {
+      verifySendBuffer(send.buffer.slice)
+      val startTime = SystemTime.nanoseconds
+      getOrMakeConnection()
+
+      try {
+        send.writeCompletely(channel)
+      } catch {
+        case e : java.io.IOException =>
+          // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
+          disconnect()
+          throw e
+        case e2 =>
+          throw e2
+      }
+      // TODO: do we still need this?
+      sentOnConnection += 1
+      if(sentOnConnection >= config.reconnectInterval) {
+        disconnect()
+        channel = connect()
+        sentOnConnection = 0
+      }
+      val endTime = SystemTime.nanoseconds
+      SyncProducerStats.recordProduceRequest(endTime - startTime)
+    }
+  }
+
+  /**
+   * Send a message
+   */
+  def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
+    verifyMessageSize(messages)
+    val setSize = messages.sizeInBytes.asInstanceOf[Int]
+    if(logger.isTraceEnabled)
+      logger.trace("Got message set with " + setSize + " bytes to send")
+    send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
+  }
+ 
+  def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages)
+
+  def multiSend(produces: Array[ProducerRequest]) {
+    for (request <- produces)
+      verifyMessageSize(request.messages)
+    val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
+    if(logger.isTraceEnabled)
+      logger.trace("Got multi message sets with " + setSize + " bytes to send")
+    send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
+  }
+
+  def close() = {
+    lock synchronized {
+      disconnect()
+      shutdown = true
+    }
+  }
+
+  private def verifyMessageSize(messages: ByteBufferMessageSet) {
+    for (messageAndOffset <- messages)
+      if (messageAndOffset.message.payloadSize > config.maxMessageSize)
+        throw new MessageSizeTooLargeException
+  }
+
+  /**
+   * Disconnect from current channel, closing connection.
+   * Side effect: channel field is set to null on successful disconnect
+   */
+  private def disconnect() {
+    try {
+      if(channel != null) {
+        logger.info("Disconnecting from " + config.host + ":" + config.port)
+        Utils.swallow(logger.warn, channel.close())
+        Utils.swallow(logger.warn, channel.socket.close())
+        channel = null
+      }
+    } catch {
+      case e: Exception => logger.error("Error on disconnect: ", e)
+    }
+  }
+    
+  private def connect(): SocketChannel = {
+    var channel: SocketChannel = null
+    var connectBackoffMs = 1
+    val beginTimeMs = SystemTime.milliseconds
+    while(channel == null && !shutdown) {
+      try {
+        channel = SocketChannel.open()
+        channel.socket.setSendBufferSize(config.bufferSize)
+        channel.configureBlocking(true)
+        channel.socket.setSoTimeout(config.socketTimeoutMs)
+        channel.socket.setKeepAlive(true)
+        channel.connect(new InetSocketAddress(config.host, config.port))
+        logger.info("Connected to " + config.host + ":" + config.port + " for producing")
+      }
+      catch {
+        case e: Exception => {
+          disconnect()
+          val endTimeMs = SystemTime.milliseconds
+          if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs)
+          {
+            logger.error("Producer connection timing out after " + config.connectTimeoutMs + " ms", e)
+            throw e
+          }
+          logger.error("Connection attempt failed, next attempt in " + connectBackoffMs + " ms", e)
+          SystemTime.sleep(connectBackoffMs)
+          connectBackoffMs = min(10 * connectBackoffMs, MaxConnectBackoffMs)
+        }
+      }
+    }
+    channel
+  }
+
+  private def getOrMakeConnection() {
+    if(channel == null) {
+      channel = connect()
+    }
+  }
+}
+
+trait SyncProducerStatsMBean {
+  def getProduceRequestsPerSecond: Double
+  def getAvgProduceRequestMs: Double
+  def getMaxProduceRequestMs: Double
+  def getNumProduceRequests: Long
+}
+
+@threadsafe
+class SyncProducerStats extends SyncProducerStatsMBean {
+  private val produceRequestStats = new SnapshotStats
+
+  def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
+
+  def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond
+
+  def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0)
+
+  def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0)
+
+  def getNumProduceRequests: Long = produceRequestStats.getNumRequests
+}
+
+object SyncProducerStats {
+  private val logger = Logger.getLogger(getClass())
+  private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
+  private val stats = new SyncProducerStats
+  Utils.swallow(logger.warn, Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
+
+  def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import kafka.utils.Utils
+import java.util.Properties
+import kafka.message.{CompressionUtils, CompressionCodec}
+
+class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
+  /** the broker to which the producer sends events */
+  val host = Utils.getString(props, "host")
+
+  /** the port on which the broker is running */
+  val port = Utils.getInt(props, "port")
+}
+
+trait SyncProducerConfigShared {
+  val props: Properties
+  
+  val bufferSize = Utils.getInt(props, "buffer.size", 100*1024)
+
+  val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
+
+  /** the socket timeout for network requests */
+  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 30000)  
+
+  val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+
+  val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+import kafka.utils.{StringSerializer, ZkUtils, ZKConfig}
+import collection.mutable.HashMap
+import collection.mutable.Map
+import org.apache.log4j.Logger
+import collection.immutable.TreeSet
+import kafka.cluster.{Broker, Partition}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import collection.SortedSet
+
+private[producer] object ZKBrokerPartitionInfo {
+  /**
+   * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers
+   * specified
+   * @param topic the topic to which the brokers have registered
+   * @param brokerList the list of brokers for which the partitions info is to be generated
+   * @return a sequence of (brokerId, numPartitions) for brokers in brokerList
+   */
+  private def getBrokerPartitions(zkClient: ZkClient, topic: String, brokerList: List[Int]): SortedSet[Partition] = {
+    val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
+    val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
+    val brokerPartitions = brokerList.zip(numPartitions)
+
+    val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
+
+    var brokerParts = SortedSet.empty[Partition]
+    sortedBrokerPartitions.foreach { bp =>
+      for(i <- 0 until bp._2) {
+        val bidPid = new Partition(bp._1, i)
+        brokerParts = brokerParts + bidPid
+      }
+    }
+    brokerParts
+  }  
+}
+
+/**
+ * If zookeeper based auto partition discovery is enabled, fetch broker info like
+ * host, port, number of partitions from zookeeper
+ */
+private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (Int, String, Int) => Unit) extends BrokerPartitionInfo {
+  private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo])
+  private val zkWatcherLock = new Object
+  private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+    StringSerializer)
+  // maintain a map from topic -> list of (broker, num_partitions) from zookeeper
+  private var topicBrokerPartitions = getZKTopicPartitionInfo
+  // maintain a map from broker id to the corresponding Broker object
+  private var allBrokers = getZKBrokerInfo
+
+  // use just the brokerTopicsListener for all watchers
+  private val brokerTopicsListener = new BrokerTopicsListener(topicBrokerPartitions, allBrokers)
+  // register listener for change of topics to keep topicsBrokerPartitions updated
+  zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, brokerTopicsListener)
+
+  // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated
+  topicBrokerPartitions.keySet.foreach {topic =>
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, brokerTopicsListener)
+    logger.debug("Registering listener on path: " + ZkUtils.BrokerTopicsPath + "/" + topic)
+  }
+
+  // register listener for new broker
+  zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerTopicsListener)
+
+  // register listener for session expired event
+  zkClient.subscribeStateChanges(new ZKSessionExpirationListener(brokerTopicsListener))
+
+  /**
+   * Return a sequence of (brokerId, numPartitions)
+   * @param topic the topic for which this information is to be returned
+   * @return a sequence of (brokerId, numPartitions). Returns a zero-length
+   * sequence if no brokers are available.
+   */
+  def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = {
+    val brokerPartitions = topicBrokerPartitions.get(topic)
+    var numBrokerPartitions = SortedSet.empty[Partition]
+    brokerPartitions match {
+      case Some(bp) =>
+        bp.size match {
+          case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
+            numBrokerPartitions = bootstrapWithExistingBrokers(topic)
+            topicBrokerPartitions += (topic -> numBrokerPartitions)
+          case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp
+        }
+      case None =>  // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
+        numBrokerPartitions = bootstrapWithExistingBrokers(topic)
+        topicBrokerPartitions += (topic -> numBrokerPartitions)
+    }
+    numBrokerPartitions
+  }
+
+  /**
+   * Generate the host and port information for the broker identified
+   * by the given broker id
+   * @param brokerId the broker for which the info is to be returned
+   * @return host and port of brokerId
+   */
+  def getBrokerInfo(brokerId: Int): Option[Broker] =  allBrokers.get(brokerId)
+
+  /**
+   * Generate a mapping from broker id to the host and port for all brokers
+   * @return mapping from id to host and port of all brokers
+   */
+  def getAllBrokerInfo: Map[Int, Broker] = allBrokers
+
+  def close = zkClient.close
+
+  private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = {
+    logger.debug("Currently, no brokers are registered under topic: " + topic)
+    logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
+      "number of partitions = 1")
+    val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath)
+    logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
+    // since we do not have the in formation about number of partitions on these brokers, just assume single partition
+    // i.e. pick partition 0 from each broker as a candidate
+    val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
+    // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
+    // participate in hosting this topic.
+    logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
+    numBrokerPartitions
+  }
+
+  /**
+   * Generate a sequence of (brokerId, numPartitions) for all topics
+   * registered in zookeeper
+   * @return a mapping from topic to sequence of (brokerId, numPartitions)
+   */
+  private def getZKTopicPartitionInfo(): collection.mutable.Map[String, SortedSet[Partition]] = {
+    val brokerPartitionsPerTopic = new HashMap[String, SortedSet[Partition]]()
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+    val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+    topics.foreach { topic =>
+    // find the number of broker partitions registered for this topic
+      val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
+      val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
+      val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
+      val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
+      val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
+      logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
+
+      var brokerParts = SortedSet.empty[Partition]
+      sortedBrokerPartitions.foreach { bp =>
+        for(i <- 0 until bp._2) {
+          val bidPid = new Partition(bp._1, i)
+          brokerParts = brokerParts + bidPid
+        }
+      }
+      brokerPartitionsPerTopic += (topic -> brokerParts)
+      logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
+    }
+    brokerPartitionsPerTopic
+  }
+
+  /**
+   * Generate a mapping from broker id to (brokerId, numPartitions) for all brokers
+   * registered in zookeeper
+   * @return a mapping from brokerId to (host, port)
+   */
+  private def getZKBrokerInfo(): Map[Int, Broker] = {
+    val brokers = new HashMap[Int, Broker]()
+    val allBrokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).map(bid => bid.toInt)
+    allBrokerIds.foreach { bid =>
+      val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
+      brokers += (bid -> Broker.createBroker(bid, brokerInfo))
+    }
+    brokers
+  }
+
+  /**
+   * Listens to new broker registrations under a particular topic, in zookeeper and
+   * keeps the related data structures updated
+   */
+  class BrokerTopicsListener(val originalBrokerTopicsPartitionsMap: collection.mutable.Map[String, SortedSet[Partition]],
+                             val originalBrokerIdMap: Map[Int, Broker]) extends IZkChildListener {
+    private var oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++
+                                              originalBrokerTopicsPartitionsMap
+    private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap
+    private val logger = Logger.getLogger(classOf[BrokerTopicsListener])
+
+    logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
+    "/broker/topics, /broker/topics/topic, /broker/ids")
+    logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
+      "partition id per topic with " + oldBrokerTopicPartitionsMap.toString)
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      zkWatcherLock synchronized {
+        logger.trace("Watcher fired for path: " + parentPath)
+        import scala.collection.JavaConversions._
+
+        parentPath match {
+          case "/brokers/topics" =>        // this is a watcher for /broker/topics path
+            val updatedTopics = asBuffer(curChilds)
+            logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
+              curChilds.toString)
+            logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
+            logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
+            val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet
+            logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
+            newTopics.foreach { topic =>
+              val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
+              val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
+              processNewBrokerInExistingTopic(topic, brokerList)
+              zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic,
+                brokerTopicsListener)
+            }
+          case "/brokers/ids"    =>        // this is a watcher for /broker/ids path
+            logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
+              "\t Currently registered list of brokers -> " + curChilds.toString)
+            processBrokerChange(parentPath, curChilds)
+          case _ =>
+            val pathSplits = parentPath.split("/")
+            val topic = pathSplits.last
+            if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
+              logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
+                " list of brokers -> " + curChilds.toString + " for topic -> " + topic)
+              processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
+            }
+        }
+
+        // update the data structures tracking older state values
+        oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
+        oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++  allBrokers
+      }
+    }
+
+    def processBrokerChange(parentPath: String, curChilds: Seq[String]) {
+      if(parentPath.equals(ZkUtils.BrokerIdsPath)) {
+        import scala.collection.JavaConversions._
+        val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
+        val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
+        logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
+        newBrokers.foreach { bid =>
+          val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
+          val brokerHostPort = brokerInfo.split(":")
+          allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt))
+          logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
+          producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt)
+        }
+        // remove dead brokers from the in memory list of live brokers
+        val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet
+        logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
+        deadBrokers.foreach {bid =>
+          allBrokers = allBrokers - bid
+          // also remove this dead broker from particular topics
+          topicBrokerPartitions.keySet.foreach{ topic =>
+            topicBrokerPartitions.get(topic) match {
+              case Some(oldBrokerPartitionList) =>
+                val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid)
+                topicBrokerPartitions += (topic -> aliveBrokerPartitionList)
+                logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
+                  "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString)
+              case None =>
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * Generate the updated mapping of (brokerId, numPartitions) for the new list of brokers
+     * registered under some topic
+     * @param parentPath the path of the topic under which the brokers have changed
+     * @param curChilds the list of changed brokers
+     */
+    def processNewBrokerInExistingTopic(topic: String, curChilds: Seq[String]) = {
+      // find the old list of brokers for this topic
+      oldBrokerTopicPartitionsMap.get(topic) match {
+        case Some(brokersParts) =>
+          logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
+        case None =>
+      }
+      val updatedBrokerList = curChilds.map(b => b.toInt)
+      import ZKBrokerPartitionInfo._
+      val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList)
+      logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
+        curChilds.toString)
+      // update the number of partitions on existing brokers
+      var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts
+      topicBrokerPartitions.get(topic) match {
+        case Some(oldBrokerParts) =>
+          logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
+            oldBrokerParts.toString)
+          mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts
+        case None =>
+      }
+      // keep only brokers that are alive
+      mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId))
+      topicBrokerPartitions += (topic -> mergedBrokerParts)
+      logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString)
+    }
+
+    def resetState = {
+      logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString)
+      oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
+      logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString)
+      logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
+      oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++  allBrokers
+      logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
+    }
+  }
+
+  /**
+   * Handles the session expiration event in zookeeper
+   */
+  class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener)
+          extends IZkStateListener {
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      /**
+       *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+       *  connection for us.
+       */
+      logger.info("ZK expired; release old list of broker partitions for topics ")
+      topicBrokerPartitions = getZKTopicPartitionInfo
+      allBrokers = getZKBrokerInfo
+      brokerTopicsListener.resetState
+
+      // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated
+      // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above,
+      // it automatically recreates the watchers there itself
+      topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic,
+                                                brokerTopicsListener))
+      // there is no need to re-register other listeners as they are listening on the child changes of
+      // permanent nodes
+    }
+
+  }
+
+}



Mime
View raw message