kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1372724 [1/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/producer/async/ core/src/main/scala/k...
Date Tue, 14 Aug 2012 04:17:29 GMT
Author: junrao
Date: Tue Aug 14 04:17:27 2012
New Revision: 1372724

URL: http://svn.apache.org/viewvc?rev=1372724&view=rev
Log:
remove ZK dependency on producer; patched by Yang Ye; reviewed by Jun Rao; KAFKA-369

Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Tue Aug 14 04:17:27 2012
@@ -125,22 +125,7 @@ public class KafkaOutputFormat<W extends
     props.setProperty("max.message.size", Integer.toString(maxSize));
     props.setProperty("compression.codec", Integer.toString(compressionCodec));
 
-    if (uri.getScheme().equals("kafka+zk")) {
-      // Software load balancer:
-      //  URL: kafka+zk://<zk connect path>#<kafka topic>
-      //  e.g. kafka+zk://kafka-zk:2181/kafka#foobar
-
-      String zkConnect = uri.getAuthority() + uri.getPath();
-
-      props.setProperty("zk.connect", zkConnect);
-      job.set("kafka.zk.connect", zkConnect);
-
-      topic = uri.getFragment();
-      if (topic == null)
-        throw new KafkaException("no topic specified in kafka uri fragment");
-
-      log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic));
-    } else if (uri.getScheme().equals("kafka")) {
+    if (uri.getScheme().equals("kafka")) {
       // using the legacy direct broker list
       // URL: kafka://<kafka host>/<topic>
       // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
@@ -167,7 +152,7 @@ public class KafkaOutputFormat<W extends
       job.set("kafka.output.topic", topic);
       log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
     } else
-      throw new KafkaException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)");
+      throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
 
     Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
     return new KafkaRecordWriter<W>(producer, topic, queueSize);

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Tue Aug 14 04:17:27 2012
@@ -154,7 +154,7 @@ object PartitionMetadata {
   }
 }
 
-case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
+case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
                              errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) {
   def sizeInBytes: Int = {
     var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Aug 14 04:17:27 2012
@@ -19,13 +19,17 @@ package kafka.producer
 import collection.mutable.HashMap
 import kafka.api.{TopicMetadataRequest, TopicMetadata}
 import kafka.common.KafkaException
-import kafka.utils.Logging
+import kafka.utils.{Logging, Utils}
+import kafka.common.ErrorMapping
 import kafka.cluster.{Replica, Partition}
-import kafka.common.{LeaderNotAvailableException, ErrorMapping, UnknownTopicException}
 
-class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
-  val topicPartitionInfo = new HashMap[String, TopicMetadata]()
-  val zkClient = producerPool.getZkClient
+
+class BrokerPartitionInfo(producerConfig: ProducerConfig,
+                          producerPool: ProducerPool,
+                          topicPartitionInfo: HashMap[String, TopicMetadata])
+        extends Logging {
+  val brokerList = producerConfig.brokerList
+  val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
 
   /**
    * Return a sequence of (brokerId, numPartitions).
@@ -69,33 +73,43 @@ class BrokerPartitionInfo(producerPool: 
    * It updates the cache by issuing a get topic metadata request to a random broker.
    * @param topic the topic for which the metadata is to be fetched
    */
-  def updateInfo(topics: Seq[String] = Seq.empty[String]) = {
-    val producer = producerPool.getAnyProducer
-    val topicList = if(topics.size > 0) topics else topicPartitionInfo.keySet.toList
-    topicList.foreach { topic =>
-      info("Fetching metadata for topic %s".format(topic))
-      val topicMetadataRequest = new TopicMetadataRequest(List(topic))
-      var topicMetaDataResponse: Seq[TopicMetadata] = Nil
+  def updateInfo(topics: Seq[String]) = {
+    var fetchMetaDataSucceeded: Boolean = false
+    var i: Int = 0
+    val topicMetadataRequest = new TopicMetadataRequest(topics)
+    var topicMetaDataResponse: Seq[TopicMetadata] = Nil
+    var t: Throwable = null
+    while(i < brokers.size && !fetchMetaDataSucceeded) {
+      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
+      info("Fetching metadata for topic %s".format(brokers))
       try {
         topicMetaDataResponse = producer.send(topicMetadataRequest)
-        // throw topic specific exception
-        topicMetaDataResponse.foreach(metadata => ErrorMapping.maybeThrowException(metadata.errorCode))
+        fetchMetaDataSucceeded = true
         // throw partition specific exception
-        topicMetaDataResponse.foreach(metadata =>
-          metadata.partitionsMetadata.foreach(partitionMetadata => ErrorMapping.maybeThrowException(partitionMetadata.errorCode)))
-      }catch {
-        case te: UnknownTopicException => throw te
-        case e: LeaderNotAvailableException => throw e
-        case oe => warn("Ignoring non leader related error while fetching metadata", oe)  // swallow non leader related errors
-      }
-      val topicMetadata:Option[TopicMetadata] = if(topicMetaDataResponse.size > 0) Some(topicMetaDataResponse.head) else None
-      topicMetadata match {
-        case Some(metadata) =>
-          info("Fetched metadata for topics %s".format(topic))
-          topicMetadata.foreach(metadata => trace("Metadata for topic %s is %s".format(metadata.topic, metadata.toString)))
-          topicPartitionInfo += (topic -> metadata)
-        case None =>
+        topicMetaDataResponse.foreach(tmd =>{
+          trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
+          if(tmd.errorCode == ErrorMapping.NoError){
+            topicPartitionInfo.put(tmd.topic, tmd)
+          } else
+            warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+          tmd.partitionsMetadata.foreach(pmd =>{
+            if (pmd.errorCode != ErrorMapping.NoError){
+              debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
+            }
+          })
+        })
+        producerPool.updateProducer(topicMetaDataResponse)
+      } catch {
+        case e =>
+          warn("fetching broker partition metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
+          t = e
+      } finally {
+        i = i + 1
+        producer.close()
       }
     }
+    if(!fetchMetaDataSucceeded){
+      throw new KafkaException("fetching broker partition metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
+    }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Tue Aug 14 04:17:27 2012
@@ -32,9 +32,9 @@ object ConsoleProducer { 
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
                            .withRequiredArg
-                           .describedAs("connection_string")
+                           .describedAs("broker-list")
                            .ofType(classOf[String])
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
     val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
@@ -68,7 +68,7 @@ object ConsoleProducer { 
 
 
     val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, zkConnectOpt)) {
+    for(arg <- List(topicOpt, brokerListOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
@@ -77,7 +77,7 @@ object ConsoleProducer { 
     }
 
     val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
+    val brokerList = options.valueOf(brokerListOpt)
     val sync = options.has(syncOpt)
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
@@ -87,7 +87,7 @@ object ConsoleProducer { 
     val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
 
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", brokerList)
     props.put("compression.codec", DefaultCompressionCodec.codec.toString)
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Tue Aug 14 04:17:27 2012
@@ -23,14 +23,10 @@ import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.Logging
 import java.util.{Properties, Date}
-import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
-  var port:Int = 0
-  var host:String = null
   var topic:String = null
   var serializerClass:String = null
-  var zkConnect:String = null
   var brokerList:String = null
   
   private var producer: Producer[String, String] = null
@@ -38,9 +34,6 @@ class KafkaLog4jAppender extends Appende
   def getTopic:String = topic
   def setTopic(topic: String) { this.topic = topic }
 
-  def getZkConnect:String = zkConnect
-  def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
-  
   def getBrokerList:String = brokerList
   def setBrokerList(brokerList: String) { this.brokerList = brokerList }
   
@@ -48,17 +41,12 @@ class KafkaLog4jAppender extends Appende
   def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
 
   override def activateOptions() {
-    val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
     // check for config parameter validity
     val props = new Properties()
-    if( zkConnect == null) connectDiagnostic += "zkConnect"
-    else props.put("zk.connect", zkConnect);
-    if( brokerList == null) connectDiagnostic += "brokerList"
-    else if( props.isEmpty) props.put("broker.list", brokerList)
-    if(props.isEmpty )
-      throw new MissingConfigException(
-        connectDiagnostic mkString ("One of these connection properties must be specified: ", ", ", ".")
-      )
+    if(brokerList != null)
+      props.put("broker.list", brokerList)
+    if(props.isEmpty)
+      throw new MissingConfigException("The broker.list property should be specified")
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(serializerClass == null) {
@@ -68,7 +56,7 @@ class KafkaLog4jAppender extends Appende
     props.put("serializer.class", serializerClass)
     val config : ProducerConfig = new ProducerConfig(props)
     producer = new Producer[String, String](config)
-    LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
+    LogLog.debug("Kafka producer connected to " +  config.brokerList)
     LogLog.debug("Logging for topic: " + topic)
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Tue Aug 14 04:17:27 2012
@@ -21,15 +21,12 @@ import kafka.utils._
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
-import org.I0Itec.zkclient.ZkClient
 import kafka.common.{QueueFullException, InvalidConfigException}
 
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
 extends Logging {
   private val hasShutdown = new AtomicBoolean(false)
-  if(!Utils.propertyExists(config.zkConnect))
-    throw new InvalidConfigException("zk.connect property must be specified in the producer")
   if (config.batchSize > config.queueSize)
     throw new InvalidConfigException("Batch size can't be larger than queue size.")
 
@@ -48,14 +45,12 @@ extends Logging {
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
-  def this(config: ProducerConfig, zkClient: ZkClient = null) =
+  def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,
                                       Utils.getObject[Partitioner[K]](config.partitionerClass),
                                       Utils.getObject[Encoder[V]](config.serializerClass),
-                                      new ProducerPool(config, if(zkClient == null)
-                                      new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
-                                        config.zkConnectionTimeoutMs, ZKStringSerializer) else zkClient)))
+                                      new ProducerPool(config)))
 
   /**
    * Sends the data, partitioned by key to the topic using either the

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Tue Aug 14 04:17:27 2012
@@ -19,37 +19,24 @@ package kafka.producer
 
 import async.AsyncProducerConfig
 import java.util.Properties
-import kafka.utils.{ZKConfig, Utils}
-import kafka.common.InvalidConfigException
+import kafka.utils.Utils
 
-class ProducerConfig(val props: Properties) extends ZKConfig(props)
-        with AsyncProducerConfig with SyncProducerConfigShared{
+class ProducerConfig(val props: Properties) extends AsyncProducerConfig with SyncProducerConfigShared{
 
-  /** For bypassing zookeeper based auto partition discovery, use this config   *
-   *  to pass in static broker and per-broker partition information. Format-    *
-   *  brokerid1:host1:port1, brokerid2:host2:port2*/
-  val brokerList = Utils.getString(props, "broker.list", null)
-  if(brokerList != null)
-    throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead")
+  /** This is for bootstrapping and the producer will only use it for getting metadata
+   * (topics, partitions and replicas). The socket connections for sending the actual data
+   * will be established based on the broker information returned in the metadata. The
+   * format is host1:por1,host2:port2, and the list can be a subset of brokers or
+   * a VIP pointing to a subset of brokers.
+   */
+  val brokerList = Utils.getString(props, "broker.list")
 
   /**
    * If DefaultEventHandler is used, this specifies the number of times to
-   * retry if an error is encountered during send. Currently, it is only
-   * appropriate when broker.list points to a VIP. If the zk.connect option
-   * is used instead, this will not have any effect because with the zk-based
-   * producer, brokers are not re-selected upon retry. So retries would go to
-   * the same (potentially still down) broker. (KAFKA-253 will help address
-   * this.)
+   * retry if an error is encountered during send.
    */
   val numRetries = Utils.getInt(props, "num.retries", 0)
 
-  /** If both broker.list and zk.connect options are specified, throw an exception */
-  if(zkConnect == null)
-    throw new InvalidConfigException("zk.connect property is required")
-
-  if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
-    throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
-
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Tue Aug 14 04:17:27 2012
@@ -19,38 +19,49 @@ package kafka.producer
 
 import kafka.cluster.Broker
 import java.util.Properties
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, Logging}
 import collection.mutable.HashMap
 import java.lang.Object
-import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException}
+import kafka.common.UnavailableProducerException
+import kafka.utils.{Utils, Logging}
+import kafka.api.TopicMetadata
 
-class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends Logging {
-  private val syncProducers = new HashMap[Int, SyncProducer]
-  private val lock = new Object()
 
-  private def addProducer(broker: Broker) {
+object ProducerPool{
+  def createSyncProducer(config: ProducerConfig): SyncProducer = {
+    val brokerList = config.brokerList
+    val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
+    createSyncProducer(config, brokers.head)
+  }
+
+  def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
     props.putAll(config.props)
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
-    syncProducers.put(broker.id, producer)
+    new SyncProducer(new SyncProducerConfig(props))
   }
+}
 
-  /**
-   *  For testing purpose
-   */
-  def addProducer(brokerId: Int, syncProducer: SyncProducer) {
-    syncProducers.put(brokerId, syncProducer)
-  }
+class ProducerPool(val config: ProducerConfig) extends Logging {
+  private val syncProducers = new HashMap[Int, SyncProducer]
+  private val lock = new Object()
 
-  def addProducers(config: ProducerConfig) {
-    lock.synchronized {
-      debug("Connecting to %s for creating sync producers for all brokers in the cluster".format(config.zkConnect))
-      val brokers = ZkUtils.getAllBrokersInCluster(zkClient)
-      brokers.foreach(broker => addProducer(broker))
+  def updateProducer(topicMetaDatas: Seq[TopicMetadata]) {
+    val newBrokers = new collection.mutable.HashSet[Broker]
+    topicMetaDatas.foreach(tmd => {
+      tmd.partitionsMetadata.foreach(pmd => {
+        if(pmd.leader.isDefined)
+          newBrokers+=(pmd.leader.get)
+      })
+    })
+    lock synchronized {
+      newBrokers.foreach(b => {
+        if(syncProducers.contains(b.id)){
+          syncProducers(b.id).close()
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
+        } else
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
+      })
     }
   }
 
@@ -64,21 +75,6 @@ class ProducerPool(val config: ProducerC
     }
   }
 
-  def getAnyProducer: SyncProducer = {
-    lock.synchronized {
-      if(syncProducers.size == 0) {
-        // refresh the list of brokers from zookeeper
-        info("No sync producers available. Refreshing the available broker list from ZK and creating sync producers")
-        addProducers(config)
-        if(syncProducers.size == 0)
-          throw new NoBrokersForPartitionException("No brokers available")
-      }
-      syncProducers.head._2
-    }
-  }
-
-  def getZkClient: ZkClient = zkClient
-
   /**
    * Closes all the producers in the pool
    */
@@ -88,7 +84,6 @@ class ProducerPool(val config: ProducerC
       val iter = syncProducers.values.iterator
       while(iter.hasNext)
         iter.next.close
-      zkClient.close()
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Tue Aug 14 04:17:27 2012
@@ -17,7 +17,6 @@
 
 package kafka.producer.async
 
-import kafka.api.{ProducerRequest, TopicData, PartitionData}
 import kafka.cluster.Partition
 import kafka.common._
 import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
@@ -26,17 +25,17 @@ import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
 import scala.collection.Map
 import scala.collection.mutable.{ListBuffer, HashMap}
+import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData}
 
-class DefaultEventHandler[K,V](config: ProducerConfig,                               // this api is for testing
-                               private val partitioner: Partitioner[K],              // use the other constructor
+
+class DefaultEventHandler[K,V](config: ProducerConfig,
+                               private val partitioner: Partitioner[K],
                                private val encoder: Encoder[V],
-                               private val producerPool: ProducerPool)
+                               private val producerPool: ProducerPool,
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
   extends EventHandler[K,V] with Logging {
 
-  val brokerPartitionInfo = new BrokerPartitionInfo(producerPool)
-
-  // add producers to the producer pool
-  producerPool.addProducers(config)
+  val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
 
   private val lock = new Object()
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Aug 14 04:17:27 2012
@@ -151,7 +151,7 @@ class KafkaApis(val requestChannel: Requ
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("producer request %s".format(produceRequest.toString))
-    trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
+    trace("Broker %s received produce request %s".format(brokerId, produceRequest.toString))
 
     val response = produceToLocalLog(produceRequest)
     debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
@@ -329,7 +329,7 @@ class KafkaApis(val requestChannel: Requ
       val info = new mutable.ArrayBuffer[PartitionData]()
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
-      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
+      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ){
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
           case Left(err) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
@@ -342,18 +342,18 @@ class KafkaApis(val requestChannel: Requ
           case Right(messages) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
-            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
-            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))
+            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId)
+            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(brokerId))
             val leaderReplica = leaderReplicaOpt.get
             fetchRequest.replicaId match {
               case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
               case _ => // fetch request from a follower
                 val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
-                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId))
                 val replica = replicaOpt.get
-                debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
-                debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
             }
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Aug 14 04:17:27 2012
@@ -21,6 +21,8 @@ import java.util.Properties
 import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
+import java.net.InetAddress
+
 
 /**
  * Configuration settings for the kafka server
@@ -30,7 +32,7 @@ class KafkaConfig(props: Properties) ext
   val port: Int = Utils.getInt(props, "port", 6667)
 
   /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
-  val hostName: String = Utils.getString(props, "hostname", null)
+  val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
 
   /* the broker id for this server */
   val brokerId: Int = Utils.getInt(props, "brokerid")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Tue Aug 14 04:17:27 2012
@@ -361,7 +361,7 @@ class KafkaController(config : KafkaConf
         allLeaders.put(topicPartition, leaderAndISR.leader)
       }
       else{
-        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment, allBrokerIds))
+        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), allBrokerIds))
       }
     }
 
@@ -378,7 +378,7 @@ class KafkaController(config : KafkaConf
 
   private def onBrokerChange(newBrokers: Set[Int] = null){
     /** handle the new brokers, send request for them to initialize the local log **/
-    if(newBrokers != null)
+    if(newBrokers != null && newBrokers.size != 0)
       deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
 
     /** handle leader election for the partitions whose leader is no longer alive **/
@@ -439,13 +439,12 @@ class KafkaController(config : KafkaConf
         }
       }
     })
-    trace("after acting on broker change, the broker to leaderAndISR request map is".format(brokerToLeaderAndISRInfosMap))
     brokerToLeaderAndISRInfosMap.foreach(m => {
       val broker = m._1
       val leaderAndISRInfos = m._2
       val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
       sendRequest(broker, leaderAndISRRequest)
-      info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(leaderAndISRRequest, broker))
+      info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(broker, leaderAndISRRequest))
     })
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Aug 14 04:17:27 2012
@@ -102,10 +102,6 @@ class KafkaServer(val config: KafkaConfi
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
-    /**
-     *  Registers this broker in ZK. After this, consumers can connect to broker.
-     *  So this should happen after socket server start.
-     */
     // start the replica manager
     replicaManager.startup()
     // start the controller

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Aug 14 04:17:27 2012
@@ -44,7 +44,7 @@ class KafkaZooKeeper(config: KafkaConfig
 
   private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
-    val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
+    val hostName = config.hostName
     val creatorId = hostName + "-" + System.currentTimeMillis
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Tue Aug 14 04:17:27 2012
@@ -72,9 +72,9 @@ object ReplayLogProducer extends Logging
       .describedAs("zookeeper url")
       .ofType(classOf[String])
       .defaultsTo("127.0.0.1:2181")
-    val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
       .withRequiredArg
-      .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+      .describedAs("hostname:port")
       .ofType(classOf[String])
     val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.")
       .withRequiredArg
@@ -117,7 +117,7 @@ object ReplayLogProducer extends Logging
       .defaultsTo(0)
 
     val options = parser.parse(args : _*)
-    for(arg <- List(brokerInfoOpt, inputTopicOpt)) {
+    for(arg <- List(brokerListOpt, inputTopicOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
@@ -125,7 +125,7 @@ object ReplayLogProducer extends Logging
       }
     }
     val zkConnect = options.valueOf(zkConnectOpt)
-    val brokerInfo = options.valueOf(brokerInfoOpt)
+    val brokerList = options.valueOf(brokerListOpt)
     val numMessages = options.valueOf(numMessagesOpt).intValue
     val isAsync = options.has(asyncOpt)
     val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
@@ -152,11 +152,7 @@ object ReplayLogProducer extends Logging
   class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()
-    val brokerInfoList = config.brokerInfo.split("=")
-    if (brokerInfoList(0) == "zk.connect")
-      props.put("zk.connect", brokerInfoList(1))
-    else
-      props.put("broker.list", brokerInfoList(1))
+    props.put("broker.list", config.brokerList)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)
     props.put("compression.codec", config.compressionCodec.codec.toString)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Tue Aug 14 04:17:27 2012
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
 import kafka.common.KafkaException
+import kafka.cluster.Broker
 
 
 /**
@@ -791,6 +792,20 @@ object Utils extends Logging {
     builder.toString
   }
 
+  def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = {
+    val brokersStr = Utils.getCSVList(brokerListStr)
+
+    brokersStr.zipWithIndex.map(b =>{
+      val brokerStr = b._1
+      val brokerId = b._2
+      val brokerInfos = brokerStr.split(":")
+      val hostName = brokerInfos(0)
+      val port = brokerInfos(1).toInt
+      val creatorId = hostName + "-" + System.currentTimeMillis()
+      new Broker(brokerId, creatorId, hostName, port)
+    })
+  }
+
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
     for(arg <- required) {
       if(!options.has(arg)) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala Tue Aug 14 04:17:27 2012
@@ -24,13 +24,14 @@ import kafka.message._
 
 object TestEndToEndLatency {
   def main(args: Array[String]) {
-    if(args.length != 2) {
-      System.err.println("USAGE: java " + getClass().getName + " zookeeper_connect num_messages")
+    if(args.length != 3) {
+      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect num_messages")
       System.exit(1)
     }
-    
-    val zkConnect = args(0)
-    val numMessages = args(1).toInt
+
+    val brokerList = args(0)
+    val zkConnect = args(1)
+    val numMessages = args(2).toInt
     val topic = "test"
     
     val consumerProps = new Properties()
@@ -46,7 +47,7 @@ object TestEndToEndLatency {
     val iter = stream.iterator
 
     val producerProps = new Properties()
-    producerProps.put("zk.connect", zkConnect)
+    producerProps.put("broker.list", brokerList)
     producerProps.put("producer.type", "sync")
     val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
     

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Tue Aug 14 04:17:27 2012
@@ -261,9 +261,6 @@ class ZookeeperConsumerConnectorTest ext
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    // shutdown one server
-    servers.last.shutdown
-
     // send some messages to each broker
     val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
     val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
@@ -376,11 +373,10 @@ class ZookeeperConsumerConnectorTest ext
     assertEquals(sentMessages1, receivedMessages1)
   }
 
-  def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int,
-                                    compression: CompressionCodec = NoCompressionCodec): List[Message] = {
+  def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("compression.codec", compression.codec.toString)
     val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
@@ -392,20 +388,20 @@ class ZookeeperConsumerConnectorTest ext
     ms.toList
   }
 
-  def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
+  def sendMessages(config: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
     var messages: List[Message] = Nil
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
 
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x =>
-        new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
+        new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
       for (message <- ms)
         messages ::= message
       producer.send(new ProducerData[Int, Message](topic, partition, ms))
-      debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, conf.brokerId, topic, partition))
+      debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
     }
     producer.close()
     messages.reverse

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Tue Aug 14 04:17:27 2012
@@ -42,7 +42,6 @@ class AutoOffsetResetTest extends JUnit3
 
   override def setUp() {
     super.setUp()
-
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
   }
@@ -70,7 +69,7 @@ class AutoOffsetResetTest extends JUnit3
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
-    val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
+    val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
 
     for(i <- 0 until numMessages)
       producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Tue Aug 14 04:17:27 2012
@@ -83,7 +83,7 @@ class FetcherTest extends JUnit3Suite wi
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
-      val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
+      val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
       val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
       messages += conf.brokerId -> ms
       producer.send(new ProducerData[String, Message](topic, topic, ms))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Tue Aug 14 04:17:27 2012
@@ -104,7 +104,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val topic = "test-topic"
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
@@ -131,7 +131,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val topic = "test-topic"
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("compression", "true")
     val config = new ProducerConfig(props)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala Tue Aug 14 04:17:27 2012
@@ -20,22 +20,21 @@ package kafka.integration
 import kafka.consumer.SimpleConsumer
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
-import kafka.utils.TestZKUtils
 import kafka.producer.{ProducerConfig, Producer}
 import kafka.message.Message
+import kafka.utils.TestUtils
 
 trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
-  
     val port: Int
     val host = "localhost"
     var producer: Producer[String, Message] = null
     var consumer: SimpleConsumer = null
 
-    override def setUp() {
+  override def setUp() {
       super.setUp
       val props = new Properties()
       props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-      props.put("zk.connect", TestZKUtils.zookeeperConnect)
+      props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
       props.put("buffer.size", "65536")
       props.put("connect.timeout.ms", "100000")
       props.put("reconnect.interval", "10000")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Tue Aug 14 04:17:27 2012
@@ -72,7 +72,7 @@ class ZookeeperConsumerConnectorTest ext
 
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
-    val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(zkConnect)
+    val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
     val javaProducer: Producer[Int, Message] = new kafka.javaapi.producer.Producer(producer)
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x =>

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Tue Aug 14 04:17:27 2012
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
+import kafka.utils.{TestUtils, Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
 import kafka.message.Message
@@ -36,6 +36,7 @@ import org.scalatest.junit.JUnit3Suite
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDirZk: File = null
+  var config: KafkaConfig = null
   var serverZk: KafkaServer = null
 
   var simpleConsumerZk: SimpleConsumer = null
@@ -54,7 +55,8 @@ class KafkaLog4jAppenderTest extends JUn
     val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
-    serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
+    config = new KafkaConfig(propsZk)
+    serverZk = TestUtils.createServer(config);
     simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
   }
 
@@ -108,7 +110,7 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
-    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+    props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // topic missing
@@ -124,7 +126,7 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+    props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
@@ -137,7 +139,7 @@ class KafkaLog4jAppenderTest extends JUn
   }
 
   @Test
-  def testZkConnectLog4jAppends() {
+  def testLog4jAppends() {
     PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
 
     for(i <- 1 to 5)
@@ -160,7 +162,7 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+    props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
     props

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Aug 14 04:17:27 2012
@@ -25,25 +25,22 @@ import org.junit.Test
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.message.Message
 import kafka.producer.async._
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.Map
 import scala.collection.mutable.ListBuffer
 import kafka.utils._
 
-class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
+class AsyncProducerTest extends JUnit3Suite {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
-  var brokers: Seq[Broker] = null
 
   override def setUp() {
     super.setUp()
-    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
   }
 
   override def tearDown() {
@@ -64,7 +61,7 @@ class AsyncProducerTest extends JUnit3Su
 
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("producer.type", "async")
     props.put("queue.size", "10")
     props.put("batch.size", "1")
@@ -88,13 +85,13 @@ class AsyncProducerTest extends JUnit3Su
   def testProduceAfterClosed() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("producer.type", "async")
     props.put("batch.size", "1")
 
     val config = new ProducerConfig(props)
     val produceData = getProduceData(10)
-    val producer = new Producer[String, String](config, zkClient)
+    val producer = new Producer[String, String](config)
     producer.close
 
     try {
@@ -167,35 +164,31 @@ class AsyncProducerTest extends JUnit3Su
     producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
 
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val broker1 = new Broker(0, "localhost", "localhost", 9092)
     val broker2 = new Broker(1, "localhost", "localhost", 9093)
+    broker1
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
     val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2))
     val topic1Metadata = new TopicMetadata("topic1", List(partition1Metadata, partition2Metadata))
     val topic2Metadata = new TopicMetadata("topic2", List(partition1Metadata, partition2Metadata))
 
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
+    topicPartitionInfos.put("topic2", topic2Metadata)
+
     val intPartitioner = new Partitioner[Int] {
       def partition(key: Int, numPartitions: Int): Int = key % numPartitions
     }
     val config = new ProducerConfig(props)
 
-    val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
-
-    val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    producerPool.getZkClient
-    EasyMock.expectLastCall().andReturn(zkClient)
-    producerPool.addProducers(config)
-    EasyMock.expectLastCall()
-    producerPool.getAnyProducer
-    EasyMock.expectLastCall().andReturn(syncProducer).times(2)
-    EasyMock.replay(producerPool)
+    val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = intPartitioner,
                                                       encoder = null.asInstanceOf[Encoder[String]],
-                                                      producerPool = producerPool)
-
+                                                      producerPool = producerPool,
+                                                      topicPartitionInfos)
 
     val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
     topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -217,25 +210,27 @@ class AsyncProducerTest extends JUnit3Su
 
     val actualResult = handler.partitionAndCollate(producerDataList)
     assertEquals(expectedResult, actualResult)
-    EasyMock.verify(syncProducer)
-    EasyMock.verify(producerPool)
   }
 
   @Test
   def testSerializeEvents() {
     val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
     // form expected partitions metadata
     val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
+
+    val producerPool = new ProducerPool(config)
 
-    val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
-    val producerPool = getMockProducerPool(config, syncProducer)
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
-                                                         producerPool = producerPool)
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos
+    )
 
     val serializedData = handler.serialize(produceData)
     val decoder = new StringDecoder
@@ -248,20 +243,22 @@ class AsyncProducerTest extends JUnit3Su
     val producerDataList = new ListBuffer[ProducerData[String,Message]]
     producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
     val props = new Properties()
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     // form expected partitions metadata
     val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
 
-    val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
 
-    val producerPool = getMockProducerPool(config, syncProducer)
+    val producerPool = new ProducerPool(config)
 
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = new NegativePartitioner,
                                                          encoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool = producerPool)
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with InvalidPartitionException")
@@ -269,29 +266,29 @@ class AsyncProducerTest extends JUnit3Su
     catch {
       case e: InvalidPartitionException => // expected, do nothing
     }
-    EasyMock.verify(syncProducer)
-    EasyMock.verify(producerPool)
   }
 
   @Test
   def testNoBroker() {
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
     // create topic metadata with 0 partitions
     val topic1Metadata = new TopicMetadata("topic1", Seq.empty)
 
-    val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
 
-    val producerPool = getMockProducerPool(config, syncProducer)
+    val producerPool = new ProducerPool(config)
 
     val producerDataList = new ListBuffer[ProducerData[String,String]]
     producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
-                                                         producerPool = producerPool)
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos)
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -299,14 +296,12 @@ class AsyncProducerTest extends JUnit3Su
     catch {
       case e: NoBrokersForPartitionException => // expected, do nothing
     }
-    EasyMock.verify(syncProducer)
-    EasyMock.verify(producerPool)
   }
 
   @Test
   def testIncompatibleEncoder() {
     val props = new Properties()
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     val producer=new Producer[String, String](config)
@@ -323,28 +318,23 @@ class AsyncProducerTest extends JUnit3Su
   @Test
   def testRandomPartitioner() {
     val props = new Properties()
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     // create topic metadata with 0 partitions
     val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
     val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092)
 
-    val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
-
-    val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    producerPool.getZkClient
-    EasyMock.expectLastCall().andReturn(zkClient)
-    producerPool.addProducers(config)
-    EasyMock.expectLastCall()
-    producerPool.getAnyProducer
-    EasyMock.expectLastCall().andReturn(syncProducer).times(2)
-    EasyMock.replay(producerPool)
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
+    topicPartitionInfos.put("topic2", topic2Metadata)
 
+    val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool = producerPool)
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos)
     val producerDataList = new ListBuffer[ProducerData[String,Message]]
     producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -360,7 +350,6 @@ class AsyncProducerTest extends JUnit3Su
       case None =>
         fail("Failed to collate requests by topic, partition")
     }
-    EasyMock.verify(producerPool)
   }
 
   @Test
@@ -369,40 +358,24 @@ class AsyncProducerTest extends JUnit3Su
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("producer.type", "async")
     props.put("batch.size", "5")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
 
     val topic = "topic1"
     val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092)
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
 
-    val msgs = TestUtils.getMsgStrings(10)
-    val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
-    mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
-    EasyMock.expectLastCall().andReturn(List(topic1Metadata))
-    mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
-    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
-    mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
-    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
-	  EasyMock.replay(mockSyncProducer)
+    val producerPool = new ProducerPool(config)
 
-    val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    producerPool.getZkClient
-    EasyMock.expectLastCall().andReturn(zkClient)
-    producerPool.addProducers(config)
-    EasyMock.expectLastCall()
-    producerPool.getAnyProducer
-    EasyMock.expectLastCall().andReturn(mockSyncProducer)
-    producerPool.getProducer(0)
-    EasyMock.expectLastCall().andReturn(mockSyncProducer).times(2)
-    producerPool.close()
-    EasyMock.expectLastCall()
-    EasyMock.replay(producerPool)
+    val msgs = TestUtils.getMsgStrings(10)
 
     val handler = new DefaultEventHandler[String,String]( config,
                                                           partitioner = null.asInstanceOf[Partitioner[String]],
                                                           encoder = new StringEncoder,
-                                                          producerPool = producerPool )
+                                                          producerPool = producerPool,
+                                                          topicPartitionInfos)
 
     val producer = new Producer[String, String](config, handler)
     try {
@@ -413,36 +386,31 @@ class AsyncProducerTest extends JUnit3Su
     } catch {
       case e: Exception => fail("Not expected", e)
     }
-
-    EasyMock.verify(mockSyncProducer)
-    EasyMock.verify(producerPool)
   }
 
   @Test
   def testFailedSendRetryLogic() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
 
     val topic1 = "topic1"
     val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
-    val msgs = TestUtils.getMsgStrings(2)
+    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
+    topicPartitionInfos.put("topic1", topic1Metadata)
+
 
-    // producer used to return topic metadata
-    val metadataSyncProducer = EasyMock.createMock(classOf[SyncProducer])
-    metadataSyncProducer.send(new TopicMetadataRequest(List(topic1)))
-    EasyMock.expectLastCall().andReturn(List(topic1Metadata)).times(3)
-    EasyMock.replay(metadataSyncProducer)
+    val msgs = TestUtils.getMsgStrings(2)
 
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
     val response1 =
       new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
-    val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
+    val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
     val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
@@ -451,13 +419,8 @@ class AsyncProducerTest extends JUnit3Su
     EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    EasyMock.expect(producerPool.getZkClient).andReturn(zkClient)
-    EasyMock.expect(producerPool.addProducers(config))
-    EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
     EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
-    EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
     EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
-    EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
     EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
     EasyMock.expect(producerPool.close())
     EasyMock.replay(producerPool)
@@ -465,7 +428,8 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = new FixedValuePartitioner(),
                                                       encoder = new StringEncoder,
-                                                      producerPool = producerPool)
+                                                      producerPool = producerPool,
+                                                      topicPartitionInfos)
     try {
       val data = List(
         new ProducerData[Int,String](topic1, 0, msgs),
@@ -477,7 +441,6 @@ class AsyncProducerTest extends JUnit3Su
       case e: Exception => fail("Not expected", e)
     }
 
-    EasyMock.verify(metadataSyncProducer)
     EasyMock.verify(mockSyncProducer)
     EasyMock.verify(producerPool)
   }
@@ -511,16 +474,13 @@ class AsyncProducerTest extends JUnit3Su
   def testInvalidConfiguration() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", TestZKUtils.zookeeperConnect)
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     props.put("producer.type", "async")
-
     try {
       new ProducerConfig(props)
       fail("should complain about wrong config")
     }
     catch {
-      case e: InvalidConfigException => //expected
+      case e: KafkaException => //expected
     }
   }
 
@@ -531,33 +491,6 @@ class AsyncProducerTest extends JUnit3Su
     producerDataList
   }
 
-  private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
-    val encoder = new StringEncoder
-    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
-  }
-
-  private def getSyncProducer(topic: Seq[String], topicMetadata: Seq[TopicMetadata]): SyncProducer = {
-    val syncProducer = EasyMock.createMock(classOf[SyncProducer])
-    topic.zip(topicMetadata).foreach { topicAndMetadata =>
-      syncProducer.send(new TopicMetadataRequest(List(topicAndMetadata._1)))
-      EasyMock.expectLastCall().andReturn(List(topicAndMetadata._2))
-    }
-    EasyMock.replay(syncProducer)
-    syncProducer
-  }
-
-  private def getMockProducerPool(config: ProducerConfig, syncProducer: SyncProducer): ProducerPool = {
-    val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    producerPool.getZkClient
-    EasyMock.expectLastCall().andReturn(zkClient)
-    producerPool.addProducers(config)
-    EasyMock.expectLastCall()
-    producerPool.getAnyProducer
-    EasyMock.expectLastCall().andReturn(syncProducer)
-    EasyMock.replay(producerPool)
-    producerPool
-  }
-
   private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
     getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Aug 14 04:17:27 2012
@@ -18,10 +18,6 @@
 package kafka.producer
 
 import org.scalatest.junit.JUnit3Suite
-import java.util.Properties
-import kafka.admin.CreateTopicCommand
-import kafka.api.FetchRequestBuilder
-import kafka.common.FailedToSendMessageException
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
@@ -31,9 +27,13 @@ import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
 import java.util
+import kafka.admin.{AdminUtils, CreateTopicCommand}
+import util.Properties
+import kafka.api.FetchRequestBuilder
+import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
 
 
-class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
+class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private val brokerId1 = 0
   private val brokerId2 = 1
   private val ports = TestUtils.choosePorts(2)
@@ -44,19 +44,21 @@ class ProducerTest extends JUnit3Suite w
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  private val config1 = new KafkaConfig(props1) {
+    override val hostName = "localhost"
+    override val numPartitions = 4
+  }
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  private val config2 = new KafkaConfig(props2) {
+    override val hostName = "localhost"
+    override val numPartitions = 4
+  }
+
   override def setUp() {
     super.setUp()
     // set up 2 brokers with 4 partitions each
-    val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-    val config1 = new KafkaConfig(props1) {
-      override val numPartitions = 4
-    }
     server1 = TestUtils.createServer(config1)
-
-    val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-    val config2 = new KafkaConfig(props2) {
-      override val numPartitions = 4
-    }
     server2 = TestUtils.createServer(config2)
 
     val props = new Properties()
@@ -82,12 +84,62 @@ class ProducerTest extends JUnit3Suite w
     super.tearDown()
   }
 
+
+  def testUpdateBrokerPartitionInfo() {
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+
+    val props1 = new util.Properties()
+    props1.put("broker.list", "localhost:80,localhost:81")
+    props1.put("serializer.class", "kafka.serializer.StringEncoder")
+    val producerConfig1 = new ProducerConfig(props1)
+    val producer1 = new Producer[String, String](producerConfig1)
+    try{
+      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      fail("Test should fail because the broker list provided are not valid")
+    } catch {
+      case e: KafkaException =>
+      case oe => fail("fails with exception", oe)
+    } finally {
+      producer1.close()
+    }
+
+    val props2 = new util.Properties()
+    props2.put("broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1)))
+    props2.put("serializer.class", "kafka.serializer.StringEncoder")
+    val producerConfig2= new ProducerConfig(props2)
+    val producer2 = new Producer[String, String](producerConfig2)
+    try{
+      producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+    } catch {
+      case e => fail("Should succeed sending the message", e)
+    } finally {
+      producer2.close()
+    }
+
+    val props3 = new util.Properties()
+    props3.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props3.put("serializer.class", "kafka.serializer.StringEncoder")
+    val producerConfig3 = new ProducerConfig(props3)
+    val producer3 = new Producer[String, String](producerConfig3)
+    try{
+      producer3.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+    } catch {
+      case e => fail("Should succeed sending the message", e)
+    } finally {
+      producer3.close()
+    }
+  }
+
   @Test
-  def testZKSendToNewTopic() {
+  def testSendToNewTopic() {
     val props1 = new util.Properties()
     props1.put("serializer.class", "kafka.serializer.StringEncoder")
     props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props1.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props1.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props1.put("producer.request.required.acks", "2")
     props1.put("producer.request.timeout.ms", "1000")
 
@@ -96,15 +148,18 @@ class ProducerTest extends JUnit3Suite w
     props2.put("producer.request.required.acks", "3")
     props2.put("producer.request.timeout.ms", "1000")
 
-    val config1 = new ProducerConfig(props1)
-    val config2 = new ProducerConfig(props2)
+    val producerConfig1 = new ProducerConfig(props1)
+    val producerConfig2 = new ProducerConfig(props2)
 
     // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
-    val producer1 = new Producer[String, String](config1)
-    val producer2 = new Producer[String, String](config2)
+    val producer1 = new Producer[String, String](producerConfig1)
+    val producer2 = new Producer[String, String](producerConfig2)
     // Available partition ids should be 0.
     producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
     producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
@@ -143,16 +198,19 @@ class ProducerTest extends JUnit3Suite w
 
 
   @Test
-  def testZKSendWithDeadBroker() {
+  def testSendWithDeadBroker() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("producer.request.timeout.ms", "2000")
 //    props.put("producer.request.required.acks", "-1")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+    assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@@ -204,13 +262,16 @@ class ProducerTest extends JUnit3Suite w
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
 
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+    assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay



Mime
View raw message