Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import java.net._
+import java.io._
+import java.nio._
+import java.nio.channels._
+
+import kafka.utils._
+
+import org.apache.log4j.Logger
+import kafka.api.RequestKeys
+
+/**
+ * An NIO socket server. The thread model is
+ * 1 Acceptor thread that handles new connections
+ * N Processor threads that each have their own selectors and handle all requests from their connections synchronously
+ */
+private[kafka] class SocketServer(val port: Int,
+ val numProcessorThreads: Int,
+ monitoringPeriodSecs: Int,
+ private val handlerFactory: Handler.HandlerMapping) {
+
+ private val logger = Logger.getLogger(classOf[SocketServer])
+ private val time = SystemTime
+ private val processors = new Array[Processor](numProcessorThreads)
+ private var acceptor: Acceptor = new Acceptor(port, processors)
+ val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
+
+ /**
+ * Start the socket server
+ */
+ def startup() {
+ for(i <- 0 until numProcessorThreads) {
+ processors(i) = new Processor(handlerFactory, time, stats)
+ Utils.newThread("kafka-processor-" + i, processors(i), false).start()
+ }
+ Utils.newThread("kafka-acceptor", acceptor, false).start()
+ acceptor.awaitStartup
+ }
+
+ /**
+ * Shutdown the socket server
+ */
+ def shutdown() = {
+ acceptor.shutdown
+ for(processor <- processors)
+ processor.shutdown
+ }
+
+}
+
+/**
+ * A base class with some helper variables and methods
+ */
+private[kafka] abstract class AbstractServerThread extends Runnable {
+
+ protected val selector = Selector.open();
+ protected val logger = Logger.getLogger(getClass())
+ private val startupLatch = new CountDownLatch(1)
+ private val shutdownLatch = new CountDownLatch(1)
+ private val alive = new AtomicBoolean(false)
+
+ /**
+ * Initiates a graceful shutdown by signeling to stop and waiting for the shutdown to complete
+ */
+ def shutdown(): Unit = {
+ alive.set(false)
+ selector.wakeup
+ shutdownLatch.await
+ }
+
+ /**
+ * Wait for the thread to completely start up
+ */
+ def awaitStartup(): Unit = startupLatch.await
+
+ /**
+ * Record that the thread startup is complete
+ */
+ protected def startupComplete() = {
+ alive.set(true)
+ startupLatch.countDown
+ }
+
+ /**
+ * Record that the thread shutdown is complete
+ */
+ protected def shutdownComplete() = shutdownLatch.countDown
+
+ /**
+ * Is the server still running?
+ */
+ protected def isRunning = alive.get
+
+}
+
+/**
+ * Thread that accepts and configures new connections. There is only need for one of these
+ */
+private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+
+ /**
+ * Accept loop that checks for new connection attempts
+ */
+ def run() {
+ val serverChannel = ServerSocketChannel.open()
+ serverChannel.configureBlocking(false)
+ serverChannel.socket.bind(new InetSocketAddress(port))
+ serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+ logger.info("Awaiting connections on port " + port)
+ startupComplete()
+
+ var currentProcessor = 0
+ while(isRunning) {
+ val ready = selector.select(500)
+ if(ready > 0) {
+ val keys = selector.selectedKeys()
+ val iter = keys.iterator()
+ while(iter.hasNext && isRunning) {
+ var key: SelectionKey = null
+ try {
+ key = iter.next
+ iter.remove()
+
+ if(key.isAcceptable)
+ accept(key, processors(currentProcessor))
+ else
+ throw new IllegalStateException("Unrecognized key state for acceptor thread.")
+
+ // round robin to the next processor thread
+ currentProcessor = (currentProcessor + 1) % processors.length
+ } catch {
+ case e: Throwable => logger.error("Error in acceptor", e)
+ }
+ }
+ }
+ }
+ logger.debug("Closing server socket and selector.")
+ Utils.swallow(logger.error, serverChannel.close())
+ Utils.swallow(logger.error, selector.close())
+ shutdownComplete()
+ }
+
+ /*
+ * Accept a new connection
+ */
+ def accept(key: SelectionKey, processor: Processor) {
+ val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
+ if(logger.isDebugEnabled)
+ logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
+ socketChannel.configureBlocking(false)
+ socketChannel.socket().setTcpNoDelay(true)
+ processor.accept(socketChannel)
+ }
+
+}
+
+/**
+ * Thread that processes all requests from a single connection. There are N of these running in parallel
+ * each of which has its own selectors
+ */
+private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
+ val time: Time,
+ val stats: SocketServerStats) extends AbstractServerThread {
+
+ private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
+ private val requestLogger = Logger.getLogger("kafka.request.logger")
+
+ override def run() {
+ startupComplete()
+ while(isRunning) {
+ // setup any new connections that have been queued up
+ configureNewConnections()
+
+ val ready = selector.select(500)
+ if(ready > 0) {
+ val keys = selector.selectedKeys()
+ val iter = keys.iterator()
+ while(iter.hasNext && isRunning) {
+ var key: SelectionKey = null
+ try {
+ key = iter.next
+ iter.remove()
+
+ if(key.isReadable)
+ read(key)
+ else if(key.isWritable)
+ write(key)
+ else if(!key.isValid)
+ close(key)
+ else
+ throw new IllegalStateException("Unrecognized key state for processor thread.")
+ } catch {
+ case e: EOFException => {
+ logger.info("Closing socket for " + channelFor(key).socket.getInetAddress + ".")
+ close(key)
+ } case e: Throwable => {
+ logger.info("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error")
+ logger.error(e, e)
+ close(key)
+ }
+ }
+ }
+ }
+ }
+ logger.debug("Closing selector.")
+ Utils.swallow(logger.info, selector.close())
+ shutdownComplete()
+ }
+
+ private def close(key: SelectionKey) {
+ val channel = key.channel.asInstanceOf[SocketChannel]
+ if(logger.isDebugEnabled)
+ logger.debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
+ Utils.swallow(logger.info, channel.socket().close())
+ Utils.swallow(logger.info, channel.close())
+ key.attach(null)
+ Utils.swallow(logger.info, key.cancel())
+ }
+
+ /**
+ * Queue up a new connection for reading
+ */
+ def accept(socketChannel: SocketChannel) {
+ newConnections.add(socketChannel)
+ selector.wakeup()
+ }
+
+ /**
+ * Register any new connections that have been queued up
+ */
+ private def configureNewConnections() {
+ while(newConnections.size() > 0) {
+ val channel = newConnections.poll()
+ if(logger.isDebugEnabled())
+ logger.debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
+ channel.register(selector, SelectionKey.OP_READ)
+ }
+ }
+
+ /**
+ * Handle a completed request producing an optional response
+ */
+ private def handle(key: SelectionKey, request: Receive): Option[Send] = {
+ val requestTypeId = request.buffer.getShort()
+ if(requestLogger.isTraceEnabled) {
+ requestTypeId match {
+ case RequestKeys.Produce =>
+ requestLogger.trace("Handling produce request from " + channelFor(key).socket.getRemoteSocketAddress())
+ case RequestKeys.Fetch =>
+ requestLogger.trace("Handling fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
+ case RequestKeys.MultiFetch =>
+ requestLogger.trace("Handling multi-fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
+ case RequestKeys.MultiProduce =>
+ requestLogger.trace("Handling multi-produce request from " + channelFor(key).socket.getRemoteSocketAddress())
+ case RequestKeys.Offsets =>
+ requestLogger.trace("Handling offset request from " + channelFor(key).socket.getRemoteSocketAddress())
+ case _ => throw new InvalidRequestException("No mapping found for handler id " + requestTypeId)
+ }
+ }
+ val handler = handlerMapping(requestTypeId, request)
+ if(handler == null)
+ throw new InvalidRequestException("No handler found for request")
+ val start = time.nanoseconds
+ val maybeSend = handler(request)
+ stats.recordRequest(requestTypeId, time.nanoseconds - start)
+ maybeSend
+ }
+
+ /*
+ * Process reads from ready sockets
+ */
+ def read(key: SelectionKey) {
+ val socketChannel = channelFor(key)
+ var request = key.attachment.asInstanceOf[Receive]
+ if(key.attachment == null) {
+ request = new BoundedByteBufferReceive()
+ key.attach(request)
+ }
+ val read = request.readFrom(socketChannel)
+ stats.recordBytesRead(read)
+ if(logger.isTraceEnabled)
+ logger.trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+ if(read < 0) {
+ close(key)
+ return
+ } else if(request.complete) {
+ val maybeResponse = handle(key, request)
+ key.attach(null)
+ // if there is a response, send it, otherwise do nothing
+ if(maybeResponse.isDefined) {
+ key.attach(maybeResponse.getOrElse(None))
+ key.interestOps(SelectionKey.OP_WRITE)
+ }
+ } else {
+ // more reading to be done
+ key.interestOps(SelectionKey.OP_READ)
+ selector.wakeup()
+ }
+ }
+
+ /*
+ * Process writes to ready sockets
+ */
+ def write(key: SelectionKey) {
+ val response = key.attachment().asInstanceOf[Send]
+ val socketChannel = channelFor(key)
+ val written = response.writeTo(socketChannel)
+ stats.recordBytesWritten(written)
+ if(logger.isTraceEnabled)
+ logger.trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+ if(response.complete) {
+ key.attach(null)
+ key.interestOps(SelectionKey.OP_READ)
+ } else {
+ key.interestOps(SelectionKey.OP_WRITE)
+ selector.wakeup()
+ }
+ }
+
+ private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.concurrent.atomic._
+import javax.management._
+import kafka.utils._
+import kafka.api.RequestKeys
+
+trait SocketServerStatsMBean {
+ def getProduceRequestsPerSecond: Double
+ def getFetchRequestsPerSecond: Double
+ def getAvgProduceRequestMs: Double
+ def getMaxProduceRequestMs: Double
+ def getAvgFetchRequestMs: Double
+ def getMaxFetchRequestMs: Double
+ def getBytesReadPerSecond: Double
+ def getBytesWrittenPerSecond: Double
+ def getNumFetchRequests: Long
+ def getNumProduceRequests: Long
+}
+
+@threadsafe
+class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends SocketServerStatsMBean {
+
+ def this(monitorDurationNs: Long) = this(monitorDurationNs, SystemTime)
+ val produceTimeStats = new SnapshotStats(monitorDurationNs)
+ val fetchTimeStats = new SnapshotStats(monitorDurationNs)
+ val produceBytesStats = new SnapshotStats(monitorDurationNs)
+ val fetchBytesStats = new SnapshotStats(monitorDurationNs)
+
+ def recordRequest(requestTypeId: Short, durationNs: Long) {
+ requestTypeId match {
+ case r if r == RequestKeys.Produce || r == RequestKeys.MultiProduce =>
+ produceTimeStats.recordRequestMetric(durationNs)
+ case r if r == RequestKeys.Fetch || r == RequestKeys.MultiFetch =>
+ fetchTimeStats.recordRequestMetric(durationNs)
+ case _ => /* not collecting; let go */
+ }
+ }
+
+ def recordBytesWritten(bytes: Int): Unit = fetchBytesStats.recordRequestMetric(bytes)
+
+ def recordBytesRead(bytes: Int): Unit = produceBytesStats.recordRequestMetric(bytes)
+
+ def getProduceRequestsPerSecond: Double = produceTimeStats.getRequestsPerSecond
+
+ def getFetchRequestsPerSecond: Double = fetchTimeStats.getRequestsPerSecond
+
+ def getAvgProduceRequestMs: Double = produceTimeStats.getAvgMetric / (1000.0 * 1000.0)
+
+ def getMaxProduceRequestMs: Double = produceTimeStats.getMaxMetric / (1000.0 * 1000.0)
+
+ def getAvgFetchRequestMs: Double = fetchTimeStats.getAvgMetric / (1000.0 * 1000.0)
+
+ def getMaxFetchRequestMs: Double = fetchTimeStats.getMaxMetric / (1000.0 * 1000.0)
+
+ def getBytesReadPerSecond: Double = produceBytesStats.getAvgMetric
+
+ def getBytesWrittenPerSecond: Double = fetchBytesStats.getAvgMetric
+
+ def getNumFetchRequests: Long = fetchTimeStats.getNumRequests
+
+ def getNumProduceRequests: Long = produceTimeStats.getNumRequests
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import org.apache.log4j.Logger
+
+/**
+ * Represents a stateful transfer of data to or from the network
+ */
+private[network] trait Transmission {
+
+ protected val logger: Logger = Logger.getLogger(getClass())
+
+ def complete: Boolean
+
+ protected def expectIncomplete(): Unit = {
+ if(complete)
+ throw new IllegalStateException("This operation cannot be completed on a complete request.")
+ }
+
+ protected def expectComplete(): Unit = {
+ if(!complete)
+ throw new IllegalStateException("This operation cannot be completed on an incomplete request.")
+ }
+
+}
+
+/**
+ * A transmission that is being received from a channel
+ */
+private[kafka] trait Receive extends Transmission {
+
+ def buffer: ByteBuffer
+
+ def readFrom(channel: ReadableByteChannel): Int
+
+ def readCompletely(channel: ReadableByteChannel): Int = {
+ var read = 0
+ while(!complete) {
+ read = readFrom(channel)
+ if(logger.isTraceEnabled)
+ logger.trace(read + " bytes read.")
+ }
+ read
+ }
+
+}
+
+/**
+ * A transmission that is being sent out to the channel
+ */
+private[kafka] trait Send extends Transmission {
+
+ def writeTo(channel: WritableByteChannel): Int
+
+ def writeCompletely(channel: WritableByteChannel): Int = {
+ var written = 0
+ while(!complete) {
+ written = writeTo(channel)
+ if(logger.isTraceEnabled)
+ logger.trace(written + " bytes written.")
+ }
+ written
+ }
+
+}
+
+/**
+ * A set of composite sends, sent one after another
+ */
+abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
+ val expectedBytesToWrite: Int
+ private var current = sends
+ var totalWritten = 0
+
+ def writeTo(channel: WritableByteChannel): Int = {
+ expectIncomplete
+ val written = current.head.writeTo(channel)
+ totalWritten += written
+ if(current.head.complete)
+ current = current.tail
+ written
+ }
+
+ def complete: Boolean = {
+ if (current == Nil) {
+ if (totalWritten != expectedBytesToWrite)
+ logger.error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
+ return true
+ }
+ else
+ return false
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/package.html Mon Aug 1 23:41:24 2011
@@ -0,0 +1,11 @@
+The network server for kafka. Now application specific code here, just general network server stuff.
+<br>
+The classes Receive and Send encapsulate the incoming and outgoing transmission of bytes. A Handler
+is a mapping between a Receive and a Send, and represents the users hook to add logic for mapping requests
+to actual processing code. Any uncaught exceptions in the reading or writing of the transmissions will result in
+the server logging an error and closing the offending socket. As a result it is the duty of the Handler
+implementation to catch and serialize any application-level errors that should be sent to the client.
+<br>
+This slightly lower-level interface that models sending and receiving rather than requests and responses
+is necessary in order to allow the send or receive to be overridden with a non-user-space writing of bytes
+using FileChannel.transferTo.
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+import collection.mutable.Map
+import collection.SortedSet
+import kafka.cluster.{Broker, Partition}
+
+trait BrokerPartitionInfo {
+ /**
+ * Return a sequence of (brokerId, numPartitions).
+ * @param topic the topic for which this information is to be returned
+ * @return a sequence of (brokerId, numPartitions). Returns a zero-length
+ * sequence if no brokers are available.
+ */
+ def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition]
+
+ /**
+ * Generate the host and port information for the broker identified
+ * by the given broker id
+ * @param brokerId the broker for which the info is to be returned
+ * @return host and port of brokerId
+ */
+ def getBrokerInfo(brokerId: Int): Option[Broker]
+
+ /**
+ * Generate a mapping from broker id to the host and port for all brokers
+ * @return mapping from id to host and port of all brokers
+ */
+ def getAllBrokerInfo: Map[Int, Broker]
+
+ /**
+ * Cleanup
+ */
+ def close
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+import collection.mutable.HashMap
+import collection.mutable.Map
+import org.apache.log4j.Logger
+import collection.SortedSet
+import kafka.cluster.{Broker, Partition}
+import kafka.common.InvalidConfigException
+
+private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo {
+ private val logger = Logger.getLogger(classOf[ConfigBrokerPartitionInfo])
+ private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo
+ private val allBrokers = getConfigBrokerInfo
+
+ /**
+ * Return a sequence of (brokerId, numPartitions)
+ * @param topic this value is null
+ * @return a sequence of (brokerId, numPartitions)
+ */
+ def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = brokerPartitions
+
+ /**
+ * Generate the host and port information for the broker identified
+ * by the given broker id
+ * @param brokerId the broker for which the info is to be returned
+ * @return host and port of brokerId
+ */
+ def getBrokerInfo(brokerId: Int): Option[Broker] = {
+ allBrokers.get(brokerId)
+ }
+
+ /**
+ * Generate a mapping from broker id to the host and port for all brokers
+ * @return mapping from id to host and port of all brokers
+ */
+ def getAllBrokerInfo: Map[Int, Broker] = allBrokers
+
+ def close {}
+
+ /**
+ * Generate a sequence of (brokerId, numPartitions) for all brokers
+ * specified in the producer configuration
+ * @return sequence of (brokerId, numPartitions)
+ */
+ private def getConfigTopicPartitionInfo(): SortedSet[Partition] = {
+ val brokerInfoList = config.brokerPartitionInfo.split(",")
+ if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty")
+ // check if each individual broker info is valid => (brokerId: brokerHost: brokerPort)
+ brokerInfoList.foreach { bInfo =>
+ val brokerInfo = bInfo.split(":")
+ if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value")
+ }
+ val brokerPartitions = brokerInfoList.map(bInfo => (bInfo.split(":").head.toInt, 1))
+ var brokerParts = SortedSet.empty[Partition]
+ brokerPartitions.foreach { bp =>
+ for(i <- 0 until bp._2) {
+ val bidPid = new Partition(bp._1, i)
+ brokerParts = brokerParts + bidPid
+ }
+ }
+ brokerParts
+ }
+
+ /**
+ * Generate the host and port information for for all brokers
+ * specified in the producer configuration
+ * @return mapping from brokerId to (host, port) for all brokers
+ */
+ private def getConfigBrokerInfo(): Map[Int, Broker] = {
+ val brokerInfo = new HashMap[Int, Broker]()
+ val brokerInfoList = config.brokerPartitionInfo.split(",")
+ brokerInfoList.foreach{ bInfo =>
+ val brokerIdHostPort = bInfo.split(":")
+ brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
+ brokerIdHostPort(1), brokerIdHostPort(2).toInt))
+ }
+ brokerInfo
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/DefaultPartitioner.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
+ private val random = new java.util.Random
+
+ def partition(key: T, numPartitions: Int): Int = {
+ if(key == null)
+ random.nextInt(numPartitions)
+ else
+ key.hashCode % numPartitions
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.MissingConfigException
+import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.{Logger, AppenderSkeleton}
+import kafka.utils.Utils
+import kafka.serializer.Encoder
+import java.util.{Properties, Date}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class KafkaLog4jAppender extends AppenderSkeleton {
+ var port:Int = 0
+ var host:String = null
+ var topic:String = null
+ var encoderClass:String = null
+
+ private var producer:SyncProducer = null
+ private val logger = Logger.getLogger(classOf[KafkaLog4jAppender])
+ private var encoder: Encoder[AnyRef] = null
+
+ def getPort:Int = port
+ def setPort(port: Int) = { this.port = port }
+
+ def getHost:String = host
+ def setHost(host: String) = { this.host = host }
+
+ def getTopic:String = topic
+ def setTopic(topic: String) = { this.topic = topic }
+
+ def getEncoder:String = encoderClass
+ def setEncoder(encoder: String) = { this.encoderClass = encoder }
+
+ override def activateOptions = {
+ // check for config parameter validity
+ if(host == null)
+ throw new MissingConfigException("Broker Host must be specified by the Kafka log4j appender")
+ if(port == 0)
+ throw new MissingConfigException("Broker Port must be specified by the Kafka log4j appender")
+ if(topic == null)
+ throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
+ if(encoderClass == null) {
+ logger.info("Using default encoder - kafka.producer.DefaultStringEncoder")
+ encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
+ }else // instantiate the encoder, if present
+ encoder = Utils.getObject(encoderClass)
+ val props = new Properties()
+ props.put("host", host)
+ props.put("port", port.toString)
+ producer = new SyncProducer(new SyncProducerConfig(props))
+ logger.info("Kafka producer connected to " + host + "," + port)
+ logger.info("Logging for topic: " + topic)
+ }
+
+ override def append(event: LoggingEvent) = {
+ if (logger.isDebugEnabled){
+ logger.debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage +
+ " for " + host + "," + port)
+ }
+ val message = encoder.toMessage(event)
+ producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message))
+ }
+
+ override def close = {
+ if(!this.closed) {
+ this.closed = true
+ producer.close
+ }
+ }
+
+ override def requiresLayout: Boolean = false
+}
+
+class DefaultStringEncoder extends Encoder[LoggingEvent] {
+ override def toMessage(event: LoggingEvent):Message = new Message(event.getMessage.asInstanceOf[String].getBytes)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Partitioner.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+trait Partitioner[T] {
+ /**
+ * Uses the key to calculate a partition bucket id for routing
+ * the data to the appropriate broker partition
+ * @return an integer between 0 and numPartitions-1
+ */
+ def partition(key: T, numPartitions: Int): Int
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.{CallbackHandler, EventHandler}
+import org.apache.log4j.Logger
+import kafka.serializer.Encoder
+import kafka.utils._
+import java.util.Properties
+import kafka.cluster.{Partition, Broker}
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.api.ProducerRequest
+import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+
+class Producer[K,V](config: ProducerConfig,
+ partitioner: Partitioner[K],
+ producerPool: ProducerPool[V],
+ populateProducerPool: Boolean,
+ private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
+ /* use the other constructor*/
+{
+ private val logger = Logger.getLogger(classOf[Producer[K, V]])
+ private val hasShutdown = new AtomicBoolean(false)
+ if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
+ throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+ private val random = new java.util.Random
+ // check if zookeeper based auto partition discovery is enabled
+ private val zkEnabled = Utils.propertyExists(config.zkConnect)
+ if(brokerPartitionInfo == null) {
+ zkEnabled match {
+ case true =>
+ val zkProps = new Properties()
+ zkProps.put("zk.connect", config.zkConnect)
+ zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
+ zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
+ zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
+ brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
+ case false =>
+ brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
+ }
+ }
+ // pool of producers, one per broker
+ if(populateProducerPool) {
+ val allBrokers = brokerPartitionInfo.getAllBrokerInfo
+ allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
+ }
+
+/**
+ * This constructor can be used when all config parameters will be specified through the
+ * ProducerConfig object
+ * @param config Producer Configuration object
+ */
+ def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
+ new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null)
+
+ /**
+ * This constructor can be used to provide pre-instantiated objects for all config parameters
+ * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
+ * callback handler. If you use this constructor, encoder, eventHandler, callback handler and partitioner
+ * will not be picked up from the config.
+ * @param config Producer Configuration object
+ * @param encoder Encoder used to convert an object of type V to a kafka.message.Message. If this is null it
+ * throws an InvalidConfigException
+ * @param eventHandler the class that implements kafka.producer.async.IEventHandler[T] used to
+ * dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer. If this is null, it
+ * uses the DefaultEventHandler
+ * @param cbkHandler the class that implements kafka.producer.async.CallbackHandler[T] used to inject
+ * callbacks at various stages of the kafka.producer.AsyncProducer pipeline. If this is null, the producer does
+ * not use the callback handler and hence does not invoke any callbacks
+ * @param partitioner class that implements the kafka.producer.Partitioner[K], used to supply a custom
+ * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
+ * object in the send API. If this is null, producer uses DefaultPartitioner
+ */
+ def this(config: ProducerConfig,
+ encoder: Encoder[V],
+ eventHandler: EventHandler[V],
+ cbkHandler: CallbackHandler[V],
+ partitioner: Partitioner[K]) =
+ this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
+ new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
+ /**
+ * Sends the data, partitioned by key to the topic using either the
+ * synchronous or the asynchronous producer
+ * @param producerData the producer data object that encapsulates the topic, key and message data
+ */
+ def send(producerData: ProducerData[K,V]*) {
+ val producerPoolRequests = producerData.map { pd =>
+ // find the number of broker partitions registered for this topic
+ logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
+ val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+ logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
+ val totalNumPartitions = numBrokerPartitions.length
+ if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+
+ var brokerIdPartition: Partition = null
+ var partition: Int = 0
+ if(zkEnabled) {
+ // get the partition id
+ val partitionId = getPartition(pd.getKey, totalNumPartitions)
+ brokerIdPartition = numBrokerPartitions(partitionId)
+ val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
+ logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+ " on partition " + brokerIdPartition.partId)
+ partition = brokerIdPartition.partId
+ }else {
+ // randomly select a broker
+ val randomBrokerId = random.nextInt(totalNumPartitions)
+ brokerIdPartition = numBrokerPartitions(randomBrokerId)
+ val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
+
+ logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+ " on a randomly chosen partition")
+ partition = ProducerRequest.RandomPartition
+ }
+ producerPool.getProducerPoolData(pd.getTopic,
+ new Partition(brokerIdPartition.brokerId, partition),
+ pd.getData)
+ }
+ producerPool.send(producerPoolRequests: _*)
+ }
+
+ /**
+ * Retrieves the partition id and throws an InvalidPartitionException if
+ * the value of partition is not between 0 and numPartitions-1
+ * @param key the partition key
+ * @param numPartitions the total number of available partitions
+ * @returns the partition id
+ */
+ private def getPartition(key: K, numPartitions: Int): Int = {
+ if(numPartitions <= 0)
+ throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
+ "\n Valid values are > 0")
+ val partition = if(key == null) random.nextInt(numPartitions)
+ else partitioner.partition(key , numPartitions)
+ if(partition < 0 || partition >= numPartitions)
+ throw new InvalidPartitionException("Invalid partition id : " + partition +
+ "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+ partition
+ }
+
+ /**
+ * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
+ * on registration of new broker in zookeeper
+ * @param bid the id of the broker
+ * @param host the hostname of the broker
+ * @param port the port of the broker
+ */
+ private def producerCbk(bid: Int, host: String, port: Int) = {
+ if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
+ else logger.debug("Skipping the callback since populateProducerPool = false")
+ }
+
+ /**
+ * Close API to close the producer pool connections to all Kafka brokers. Also closes
+ * the zookeeper client connection if one exists
+ */
+ def close() = {
+ val canShutdown = hasShutdown.compareAndSet(false, true)
+ if(canShutdown) {
+ producerPool.close
+ brokerPartitionInfo.close
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.AsyncProducerConfigShared
+import java.util.Properties
+import kafka.utils.{ZKConfig, Utils}
+import kafka.common.InvalidConfigException
+
+class ProducerConfig(val props: Properties) extends ZKConfig(props)
+ with AsyncProducerConfigShared with SyncProducerConfigShared{
+
+ /** For bypassing zookeeper based auto partition discovery, use this config *
+ * to pass in static broker and per-broker partition information. Format- *
+ * brokerid1:host1:port1, brokerid2:host2:port2*/
+ val brokerPartitionInfo = Utils.getString(props, "broker.list", null)
+ if(brokerPartitionInfo != null && Utils.getString(props, "partitioner.class", null) != null)
+ throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
+
+ /** the partitioner class for partitioning events amongst sub-topics */
+ val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
+
+ /** this parameter specifies whether the messages are sent asynchronously *
+ * or not. Valid values are - async for asynchronous send *
+ * sync for synchronous send */
+ val producerType = Utils.getString(props, "producer.type", "sync")
+
+ /**
+ * This parameter allows you to specify the compression codec for all data generated *
+ * by this producer. The default is NoCompressionCodec
+ */
+ val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
+
+ /** This parameter allows you to set whether compression should be turned *
+ * on for particular topics
+ *
+ * If the compression codec is anything other than NoCompressionCodec,
+ *
+ * Enable compression only for specified topics if any
+ *
+ * If the list of compressed topics is empty, then enable the specified compression codec for all topics
+ *
+ * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+ */
+ val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerData.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+/**
+ * Represents the data to be sent using the Producer send API
+ * @param topic the topic under which the message is to be published
+ * @param key the key used by the partitioner to pick a broker partition
+ * @param data variable length data to be published as Kafka messages under topic
+ */
+class ProducerData[K, V](private val topic: String,
+ private val key: K,
+ private val data: Seq[V]) {
+
+ def this(t: String, d: Seq[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
+
+ def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d))
+
+ def getTopic: String = topic
+
+ def getKey: K = key
+
+ def getData: Seq[V] = data
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async._
+import java.util.Properties
+import kafka.serializer.Encoder
+import org.apache.log4j.Logger
+import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
+import kafka.cluster.{Partition, Broker}
+import kafka.api.ProducerRequest
+import kafka.common.{UnavailableProducerException, InvalidConfigException}
+import kafka.utils.Utils
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+
+class ProducerPool[V](private val config: ProducerConfig,
+ private val serializer: Encoder[V],
+ private val syncProducers: ConcurrentMap[Int, SyncProducer],
+ private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]],
+ private val inputEventHandler: EventHandler[V] = null,
+ private val cbkHandler: CallbackHandler[V] = null) {
+
+ private val logger = Logger.getLogger(classOf[ProducerPool[V]])
+ private var eventHandler = inputEventHandler
+ if(eventHandler == null)
+ eventHandler = new DefaultEventHandler(config, cbkHandler)
+
+ if(serializer == null)
+ throw new InvalidConfigException("serializer passed in is null!")
+
+ private var sync: Boolean = true
+ config.producerType match {
+ case "sync" =>
+ case "async" => sync = false
+ case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
+ }
+
+ def this(config: ProducerConfig, serializer: Encoder[V],
+ eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V]) =
+ this(config, serializer,
+ new ConcurrentHashMap[Int, SyncProducer](),
+ new ConcurrentHashMap[Int, AsyncProducer[V]](),
+ eventHandler, cbkHandler)
+
+ def this(config: ProducerConfig, serializer: Encoder[V]) = this(config, serializer,
+ new ConcurrentHashMap[Int, SyncProducer](),
+ new ConcurrentHashMap[Int, AsyncProducer[V]](),
+ Utils.getObject(config.eventHandler),
+ Utils.getObject(config.cbkHandler))
+ /**
+ * add a new producer, either synchronous or asynchronous, connecting
+ * to the specified broker
+ * @param bid the id of the broker
+ * @param host the hostname of the broker
+ * @param port the port of the broker
+ */
+ def addProducer(broker: Broker) {
+ if(sync) {
+ val props = new Properties()
+ props.put("host", broker.host)
+ props.put("port", broker.port.toString)
+ props.put("buffer.size", config.bufferSize.toString)
+ props.put("connect.timeout.ms", config.connectTimeoutMs.toString)
+ props.put("reconnect.interval", config.reconnectInterval.toString)
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+ syncProducers.put(broker.id, producer)
+ } else {
+ val props = new Properties()
+ props.put("host", broker.host)
+ props.put("port", broker.port.toString)
+ props.put("queue.time", config.queueTime.toString)
+ props.put("queue.size", config.queueSize.toString)
+ props.put("batch.size", config.batchSize.toString)
+ props.put("serializer.class", config.serializerClass)
+ val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
+ new SyncProducer(new SyncProducerConfig(props)),
+ serializer,
+ eventHandler, config.eventHandlerProps,
+ cbkHandler, config.cbkHandlerProps)
+ producer.start
+ logger.info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+ asyncProducers.put(broker.id, producer)
+ }
+ }
+
+ /**
+ * selects either a synchronous or an asynchronous producer, for
+ * the specified broker id and calls the send API on the selected
+ * producer to publish the data to the specified broker partition
+ * @param poolData the producer pool request object
+ */
+ def send(poolData: ProducerPoolData[V]*) {
+ val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct
+ var remainingRequests = poolData.toSeq
+ distinctBrokers.foreach { bid =>
+ val requestsForThisBid = remainingRequests partition (_.getBidPid.brokerId == bid)
+ remainingRequests = requestsForThisBid._2
+
+ if(sync) {
+ val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
+ new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
+ messages = req.getData.map(d => serializer.toMessage(d)): _*)))
+ logger.debug("Fetching sync producer for broker id: " + bid)
+ val producer = syncProducers.get(bid)
+ if(producer != null) {
+ if(producerRequests.size > 1)
+ producer.multiSend(producerRequests.toArray)
+ else
+ producer.send(topic = producerRequests(0).topic,
+ partition = producerRequests(0).partition,
+ messages = producerRequests(0).messages)
+ config.compressionCodec match {
+ case NoCompressionCodec => logger.debug("Sending message to broker " + bid)
+ case _ => logger.debug("Sending compressed messages to broker " + bid)
+ }
+ }else
+ throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
+ "Sync Producer for broker " + bid + " does not exist in the pool")
+ }else {
+ logger.debug("Fetching async producer for broker id: " + bid)
+ val producer = asyncProducers.get(bid)
+ if(producer != null) {
+ requestsForThisBid._1.foreach { req =>
+ req.getData.foreach(d => producer.send(req.getTopic, d, req.getBidPid.partId))
+ }
+ if(logger.isDebugEnabled)
+ config.compressionCodec match {
+ case NoCompressionCodec => logger.debug("Sending message")
+ case _ => logger.debug("Sending compressed messages")
+ }
+ }
+ else
+ throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
+ "Async Producer for broker " + bid + " does not exist in the pool")
+ }
+ }
+ }
+
+ /**
+ * Closes all the producers in the pool
+ */
+ def close() = {
+ config.producerType match {
+ case "sync" =>
+ logger.info("Closing all sync producers")
+ val iter = syncProducers.values.iterator
+ while(iter.hasNext)
+ iter.next.close
+ case "async" =>
+ logger.info("Closing all async producers")
+ val iter = asyncProducers.values.iterator
+ while(iter.hasNext)
+ iter.next.close
+ }
+ }
+
+ /**
+ * This constructs and returns the request object for the producer pool
+ * @param topic the topic to which the data should be published
+ * @param bidPid the broker id and partition id
+ * @param data the data to be published
+ */
+ def getProducerPoolData(topic: String, bidPid: Partition, data: Seq[V]): ProducerPoolData[V] = {
+ new ProducerPoolData[V](topic, bidPid, data)
+ }
+
+ class ProducerPoolData[V](topic: String,
+ bidPid: Partition,
+ data: Seq[V]) {
+ def getTopic: String = topic
+ def getBidPid: Partition = bidPid
+ def getData: Seq[V] = data
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import java.net._
+import java.nio.channels._
+import kafka.message._
+import kafka.network._
+import kafka.utils._
+import kafka.api._
+import scala.math._
+import org.apache.log4j.{Level, Logger}
+import kafka.common.MessageSizeTooLargeException
+import java.nio.ByteBuffer
+
+object SyncProducer {
+ val RequestKey: Short = 0
+}
+
+/*
+ * Send a message set.
+ */
+@threadsafe
+class SyncProducer(val config: SyncProducerConfig) {
+
+ private val logger = Logger.getLogger(getClass())
+ private val MaxConnectBackoffMs = 60000
+ private var channel : SocketChannel = null
+ private var sentOnConnection = 0
+ private val lock = new Object()
+ @volatile
+ private var shutdown: Boolean = false
+
+ logger.debug("Instantiating Scala Sync Producer")
+
+ private def verifySendBuffer(buffer : ByteBuffer) = {
+ if (logger.isTraceEnabled) {
+ logger.trace("verifying sendbuffer of size " + buffer.limit)
+ val requestTypeId = buffer.getShort()
+ if (requestTypeId == RequestKeys.MultiProduce) {
+ try {
+ val request = MultiProducerRequest.readFrom(buffer)
+ for (produce <- request.produces) {
+ try {
+ for (messageAndOffset <- produce.messages)
+ if (!messageAndOffset.message.isValid)
+ logger.trace("topic " + produce.topic + " is invalid")
+ }
+ catch {
+ case e: Throwable =>
+ logger.trace("error iterating messages " + e + Utils.stackTrace(e))
+ }
+ }
+ }
+ catch {
+ case e: Throwable =>
+ logger.trace("error verifying sendbuffer " + e + Utils.stackTrace(e))
+ }
+ }
+ }
+ }
+
+ /**
+ * Common functionality for the public send methods
+ */
+ private def send(send: BoundedByteBufferSend) {
+ lock synchronized {
+ verifySendBuffer(send.buffer.slice)
+ val startTime = SystemTime.nanoseconds
+ getOrMakeConnection()
+
+ try {
+ send.writeCompletely(channel)
+ } catch {
+ case e : java.io.IOException =>
+ // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
+ disconnect()
+ throw e
+ case e2 =>
+ throw e2
+ }
+ // TODO: do we still need this?
+ sentOnConnection += 1
+ if(sentOnConnection >= config.reconnectInterval) {
+ disconnect()
+ channel = connect()
+ sentOnConnection = 0
+ }
+ val endTime = SystemTime.nanoseconds
+ SyncProducerStats.recordProduceRequest(endTime - startTime)
+ }
+ }
+
+ /**
+ * Send a message
+ */
+ def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
+ verifyMessageSize(messages)
+ val setSize = messages.sizeInBytes.asInstanceOf[Int]
+ if(logger.isTraceEnabled)
+ logger.trace("Got message set with " + setSize + " bytes to send")
+ send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
+ }
+
+ def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages)
+
+ def multiSend(produces: Array[ProducerRequest]) {
+ for (request <- produces)
+ verifyMessageSize(request.messages)
+ val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
+ if(logger.isTraceEnabled)
+ logger.trace("Got multi message sets with " + setSize + " bytes to send")
+ send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
+ }
+
+ def close() = {
+ lock synchronized {
+ disconnect()
+ shutdown = true
+ }
+ }
+
+ private def verifyMessageSize(messages: ByteBufferMessageSet) {
+ for (messageAndOffset <- messages)
+ if (messageAndOffset.message.payloadSize > config.maxMessageSize)
+ throw new MessageSizeTooLargeException
+ }
+
+ /**
+ * Disconnect from current channel, closing connection.
+ * Side effect: channel field is set to null on successful disconnect
+ */
+ private def disconnect() {
+ try {
+ if(channel != null) {
+ logger.info("Disconnecting from " + config.host + ":" + config.port)
+ Utils.swallow(logger.warn, channel.close())
+ Utils.swallow(logger.warn, channel.socket.close())
+ channel = null
+ }
+ } catch {
+ case e: Exception => logger.error("Error on disconnect: ", e)
+ }
+ }
+
+ private def connect(): SocketChannel = {
+ var channel: SocketChannel = null
+ var connectBackoffMs = 1
+ val beginTimeMs = SystemTime.milliseconds
+ while(channel == null && !shutdown) {
+ try {
+ channel = SocketChannel.open()
+ channel.socket.setSendBufferSize(config.bufferSize)
+ channel.configureBlocking(true)
+ channel.socket.setSoTimeout(config.socketTimeoutMs)
+ channel.socket.setKeepAlive(true)
+ channel.connect(new InetSocketAddress(config.host, config.port))
+ logger.info("Connected to " + config.host + ":" + config.port + " for producing")
+ }
+ catch {
+ case e: Exception => {
+ disconnect()
+ val endTimeMs = SystemTime.milliseconds
+ if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs)
+ {
+ logger.error("Producer connection timing out after " + config.connectTimeoutMs + " ms", e)
+ throw e
+ }
+ logger.error("Connection attempt failed, next attempt in " + connectBackoffMs + " ms", e)
+ SystemTime.sleep(connectBackoffMs)
+ connectBackoffMs = min(10 * connectBackoffMs, MaxConnectBackoffMs)
+ }
+ }
+ }
+ channel
+ }
+
+ private def getOrMakeConnection() {
+ if(channel == null) {
+ channel = connect()
+ }
+ }
+}
+
+trait SyncProducerStatsMBean {
+ def getProduceRequestsPerSecond: Double
+ def getAvgProduceRequestMs: Double
+ def getMaxProduceRequestMs: Double
+ def getNumProduceRequests: Long
+}
+
+@threadsafe
+class SyncProducerStats extends SyncProducerStatsMBean {
+ private val produceRequestStats = new SnapshotStats
+
+ def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
+
+ def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond
+
+ def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0)
+
+ def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0)
+
+ def getNumProduceRequests: Long = produceRequestStats.getNumRequests
+}
+
+object SyncProducerStats {
+ private val logger = Logger.getLogger(getClass())
+ private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
+ private val stats = new SyncProducerStats
+ Utils.swallow(logger.warn, Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
+
+ def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import kafka.utils.Utils
+import java.util.Properties
+import kafka.message.{CompressionUtils, CompressionCodec}
+
+class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
+ /** the broker to which the producer sends events */
+ val host = Utils.getString(props, "host")
+
+ /** the port on which the broker is running */
+ val port = Utils.getInt(props, "port")
+}
+
+trait SyncProducerConfigShared {
+ val props: Properties
+
+ val bufferSize = Utils.getInt(props, "buffer.size", 100*1024)
+
+ val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
+
+ /** the socket timeout for network requests */
+ val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 30000)
+
+ val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+
+ val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.producer
+
+import kafka.utils.{StringSerializer, ZkUtils, ZKConfig}
+import collection.mutable.HashMap
+import collection.mutable.Map
+import org.apache.log4j.Logger
+import collection.immutable.TreeSet
+import kafka.cluster.{Broker, Partition}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import collection.SortedSet
+
+private[producer] object ZKBrokerPartitionInfo {
+ /**
+ * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers
+ * specified
+ * @param topic the topic to which the brokers have registered
+ * @param brokerList the list of brokers for which the partitions info is to be generated
+ * @return a sequence of (brokerId, numPartitions) for brokers in brokerList
+ */
+ private def getBrokerPartitions(zkClient: ZkClient, topic: String, brokerList: List[Int]): SortedSet[Partition] = {
+ val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
+ val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
+ val brokerPartitions = brokerList.zip(numPartitions)
+
+ val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
+
+ var brokerParts = SortedSet.empty[Partition]
+ sortedBrokerPartitions.foreach { bp =>
+ for(i <- 0 until bp._2) {
+ val bidPid = new Partition(bp._1, i)
+ brokerParts = brokerParts + bidPid
+ }
+ }
+ brokerParts
+ }
+}
+
+/**
+ * If zookeeper based auto partition discovery is enabled, fetch broker info like
+ * host, port, number of partitions from zookeeper
+ */
+private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (Int, String, Int) => Unit) extends BrokerPartitionInfo {
+ private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo])
+ private val zkWatcherLock = new Object
+ private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ StringSerializer)
+ // maintain a map from topic -> list of (broker, num_partitions) from zookeeper
+ private var topicBrokerPartitions = getZKTopicPartitionInfo
+ // maintain a map from broker id to the corresponding Broker object
+ private var allBrokers = getZKBrokerInfo
+
+ // use just the brokerTopicsListener for all watchers
+ private val brokerTopicsListener = new BrokerTopicsListener(topicBrokerPartitions, allBrokers)
+ // register listener for change of topics to keep topicsBrokerPartitions updated
+ zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, brokerTopicsListener)
+
+ // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated
+ topicBrokerPartitions.keySet.foreach {topic =>
+ zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, brokerTopicsListener)
+ logger.debug("Registering listener on path: " + ZkUtils.BrokerTopicsPath + "/" + topic)
+ }
+
+ // register listener for new broker
+ zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerTopicsListener)
+
+ // register listener for session expired event
+ zkClient.subscribeStateChanges(new ZKSessionExpirationListener(brokerTopicsListener))
+
+ /**
+ * Return a sequence of (brokerId, numPartitions)
+ * @param topic the topic for which this information is to be returned
+ * @return a sequence of (brokerId, numPartitions). Returns a zero-length
+ * sequence if no brokers are available.
+ */
+ def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = {
+ val brokerPartitions = topicBrokerPartitions.get(topic)
+ var numBrokerPartitions = SortedSet.empty[Partition]
+ brokerPartitions match {
+ case Some(bp) =>
+ bp.size match {
+ case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
+ numBrokerPartitions = bootstrapWithExistingBrokers(topic)
+ topicBrokerPartitions += (topic -> numBrokerPartitions)
+ case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp
+ }
+ case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
+ numBrokerPartitions = bootstrapWithExistingBrokers(topic)
+ topicBrokerPartitions += (topic -> numBrokerPartitions)
+ }
+ numBrokerPartitions
+ }
+
+ /**
+ * Generate the host and port information for the broker identified
+ * by the given broker id
+ * @param brokerId the broker for which the info is to be returned
+ * @return host and port of brokerId
+ */
+ def getBrokerInfo(brokerId: Int): Option[Broker] = allBrokers.get(brokerId)
+
+ /**
+ * Generate a mapping from broker id to the host and port for all brokers
+ * @return mapping from id to host and port of all brokers
+ */
+ def getAllBrokerInfo: Map[Int, Broker] = allBrokers
+
+ def close = zkClient.close
+
+ private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = {
+ logger.debug("Currently, no brokers are registered under topic: " + topic)
+ logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
+ "number of partitions = 1")
+ val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath)
+ logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
+ // since we do not have the in formation about number of partitions on these brokers, just assume single partition
+ // i.e. pick partition 0 from each broker as a candidate
+ val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
+ // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
+ // participate in hosting this topic.
+ logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
+ numBrokerPartitions
+ }
+
+ /**
+ * Generate a sequence of (brokerId, numPartitions) for all topics
+ * registered in zookeeper
+ * @return a mapping from topic to sequence of (brokerId, numPartitions)
+ */
+ private def getZKTopicPartitionInfo(): collection.mutable.Map[String, SortedSet[Partition]] = {
+ val brokerPartitionsPerTopic = new HashMap[String, SortedSet[Partition]]()
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+ val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+ topics.foreach { topic =>
+ // find the number of broker partitions registered for this topic
+ val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
+ val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
+ val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
+ val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
+ val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
+ logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
+
+ var brokerParts = SortedSet.empty[Partition]
+ sortedBrokerPartitions.foreach { bp =>
+ for(i <- 0 until bp._2) {
+ val bidPid = new Partition(bp._1, i)
+ brokerParts = brokerParts + bidPid
+ }
+ }
+ brokerPartitionsPerTopic += (topic -> brokerParts)
+ logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
+ }
+ brokerPartitionsPerTopic
+ }
+
+ /**
+ * Generate a mapping from broker id to (brokerId, numPartitions) for all brokers
+ * registered in zookeeper
+ * @return a mapping from brokerId to (host, port)
+ */
+ private def getZKBrokerInfo(): Map[Int, Broker] = {
+ val brokers = new HashMap[Int, Broker]()
+ val allBrokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).map(bid => bid.toInt)
+ allBrokerIds.foreach { bid =>
+ val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
+ brokers += (bid -> Broker.createBroker(bid, brokerInfo))
+ }
+ brokers
+ }
+
+ /**
+ * Listens to new broker registrations under a particular topic, in zookeeper and
+ * keeps the related data structures updated
+ */
+ class BrokerTopicsListener(val originalBrokerTopicsPartitionsMap: collection.mutable.Map[String, SortedSet[Partition]],
+ val originalBrokerIdMap: Map[Int, Broker]) extends IZkChildListener {
+ private var oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++
+ originalBrokerTopicsPartitionsMap
+ private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap
+ private val logger = Logger.getLogger(classOf[BrokerTopicsListener])
+
+ logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
+ "/broker/topics, /broker/topics/topic, /broker/ids")
+ logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
+ "partition id per topic with " + oldBrokerTopicPartitionsMap.toString)
+
+ @throws(classOf[Exception])
+ def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ zkWatcherLock synchronized {
+ logger.trace("Watcher fired for path: " + parentPath)
+ import scala.collection.JavaConversions._
+
+ parentPath match {
+ case "/brokers/topics" => // this is a watcher for /broker/topics path
+ val updatedTopics = asBuffer(curChilds)
+ logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
+ curChilds.toString)
+ logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
+ logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
+ val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet
+ logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
+ newTopics.foreach { topic =>
+ val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
+ val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
+ processNewBrokerInExistingTopic(topic, brokerList)
+ zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic,
+ brokerTopicsListener)
+ }
+ case "/brokers/ids" => // this is a watcher for /broker/ids path
+ logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
+ "\t Currently registered list of brokers -> " + curChilds.toString)
+ processBrokerChange(parentPath, curChilds)
+ case _ =>
+ val pathSplits = parentPath.split("/")
+ val topic = pathSplits.last
+ if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
+ logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
+ " list of brokers -> " + curChilds.toString + " for topic -> " + topic)
+ processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
+ }
+ }
+
+ // update the data structures tracking older state values
+ oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
+ oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers
+ }
+ }
+
+ def processBrokerChange(parentPath: String, curChilds: Seq[String]) {
+ if(parentPath.equals(ZkUtils.BrokerIdsPath)) {
+ import scala.collection.JavaConversions._
+ val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
+ val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
+ logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
+ newBrokers.foreach { bid =>
+ val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
+ val brokerHostPort = brokerInfo.split(":")
+ allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt))
+ logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
+ producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt)
+ }
+ // remove dead brokers from the in memory list of live brokers
+ val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet
+ logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
+ deadBrokers.foreach {bid =>
+ allBrokers = allBrokers - bid
+ // also remove this dead broker from particular topics
+ topicBrokerPartitions.keySet.foreach{ topic =>
+ topicBrokerPartitions.get(topic) match {
+ case Some(oldBrokerPartitionList) =>
+ val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid)
+ topicBrokerPartitions += (topic -> aliveBrokerPartitionList)
+ logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
+ "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString)
+ case None =>
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate the updated mapping of (brokerId, numPartitions) for the new list of brokers
+ * registered under some topic
+ * @param parentPath the path of the topic under which the brokers have changed
+ * @param curChilds the list of changed brokers
+ */
+ def processNewBrokerInExistingTopic(topic: String, curChilds: Seq[String]) = {
+ // find the old list of brokers for this topic
+ oldBrokerTopicPartitionsMap.get(topic) match {
+ case Some(brokersParts) =>
+ logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
+ case None =>
+ }
+ val updatedBrokerList = curChilds.map(b => b.toInt)
+ import ZKBrokerPartitionInfo._
+ val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList)
+ logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
+ curChilds.toString)
+ // update the number of partitions on existing brokers
+ var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts
+ topicBrokerPartitions.get(topic) match {
+ case Some(oldBrokerParts) =>
+ logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
+ oldBrokerParts.toString)
+ mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts
+ case None =>
+ }
+ // keep only brokers that are alive
+ mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId))
+ topicBrokerPartitions += (topic -> mergedBrokerParts)
+ logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString)
+ }
+
+ def resetState = {
+ logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString)
+ oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
+ logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString)
+ logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
+ oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers
+ logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
+ }
+ }
+
+ /**
+ * Handles the session expiration event in zookeeper
+ */
+ class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener)
+ extends IZkStateListener {
+ @throws(classOf[Exception])
+ def handleStateChanged(state: KeeperState) {
+ // do nothing, since zkclient will do reconnect for us.
+ }
+
+ /**
+ * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+ * any ephemeral nodes here.
+ *
+ * @throws Exception
+ * On any error.
+ */
+ @throws(classOf[Exception])
+ def handleNewSession() {
+ /**
+ * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+ * connection for us.
+ */
+ logger.info("ZK expired; release old list of broker partitions for topics ")
+ topicBrokerPartitions = getZKTopicPartitionInfo
+ allBrokers = getZKBrokerInfo
+ brokerTopicsListener.resetState
+
+ // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated
+ // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above,
+ // it automatically recreates the watchers there itself
+ topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic,
+ brokerTopicsListener))
+ // there is no need to re-register other listeners as they are listening on the child changes of
+ // permanent nodes
+ }
+
+ }
+
+}
|