kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1303473 [1/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/cluster/ main/scala/kafka/consumer/ main/scala/kafka/javaapi/ main/scala/kafka/log/ main/scala/kafka/producer/ main/scala/kafka/producer/as...
Date Wed, 21 Mar 2012 17:29:34 GMT
Author: nehanarkhede
Date: Wed Mar 21 17:29:32 2012
New Revision: 1303473

URL: http://svn.apache.org/viewvc?rev=1303473&view=rev
Log:
KAFKA-300 Leader election; patched by nehanarkhede; reviewed by junrao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Wed Mar 21 17:29:32 2012
@@ -80,10 +80,6 @@ object AdminUtils extends Logging {
       for (i <- 0 until replicaAssignmentList.size) {
         val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
         ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
-        // TODO: Remove this with leader election patch
-        // assign leader for the partition i
-//        ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, i.toString),
-//          replicaAssignmentList(i).head)
         debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Wed Mar 21 17:29:32 2012
@@ -16,14 +16,12 @@
  */
 package kafka.cluster
 
-case class Partition(brokerId: Int, partId: Int, topic: String = "") extends Ordered[Partition] {
-
-  def name = partId
-
-  def compare(that: Partition) =
-    if (this.topic == that.topic)
-      this.partId - that.partId
-    else
-      this.topic.compareTo(that.topic)
-
-}
+/**
+ * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
+ * TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302
+ */
+case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None,
+                     assignedReplicas: Set[Replica] = Set.empty[Replica],
+                     inSyncReplicas: Set[Replica] = Set.empty[Replica],
+                     catchUpReplicas: Set[Replica] = Set.empty[Replica],
+                     reassignedReplicas: Set[Replica] = Set.empty[Replica])

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1303473&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Wed Mar 21 17:29:32 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.cluster
+
+import kafka.log.Log
+
+case class Replica(brokerId: Int, partition: Partition, topic: String,
+                   var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false)
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Wed Mar 21 17:29:32 2012
@@ -20,7 +20,7 @@ package kafka.consumer
 import java.io.IOException
 import java.util.concurrent.CountDownLatch
 import kafka.api.{FetchRequestBuilder, OffsetRequest}
-import kafka.cluster.{Partition, Broker}
+import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import kafka.message.ByteBufferMessageSet
 import kafka.utils._
@@ -48,7 +48,7 @@ class FetcherRunnable(val name: String,
 
   override def run() {
     for (infopti <- partitionTopicInfos)
-      info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: "
+      info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partitionId + " offset: "
         + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port)
 
     var reqId = 0
@@ -61,7 +61,7 @@ class FetcherRunnable(val name: String,
           maxWait(0).
           minBytes(0)
         partitionTopicInfos.foreach(pti =>
-          builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize)
+          builder.addFetch(pti.topic, pti.partitionId, pti.getFetchOffset(), config.fetchSize)
         )
 
         val fetchRequest = builder.build()
@@ -70,13 +70,13 @@ class FetcherRunnable(val name: String,
 
         var read = 0L
         for(infopti <- partitionTopicInfos) {
-          val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet]
+          val messages = response.messageSet(infopti.topic, infopti.partitionId).asInstanceOf[ByteBufferMessageSet]
           try {
             var done = false
             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
               info("offset for " + infopti + " out of range")
               // see if we can fix this error
-              val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partition)
+              val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partitionId)
               if(resetOffset >= 0) {
                 infopti.resetFetchOffset(resetOffset)
                 infopti.resetConsumeOffset(resetOffset)
@@ -126,7 +126,7 @@ class FetcherRunnable(val name: String,
   private def shutdownComplete() = shutdownLatch.countDown
 
   private def resetConsumerOffsets(topic : String,
-                                   partition: Partition) : Long = {
+                                   partitionId: Int) : Long = {
     var offset : Long = 0
     config.autoOffsetReset match {
       case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime
@@ -135,13 +135,13 @@ class FetcherRunnable(val name: String,
     }
 
     // get mentioned offset from the broker
-    val offsets = simpleConsumer.getOffsetsBefore(topic, partition.partId, offset, 1)
+    val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, offset, 1)
     val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
     // reset manually in zookeeper
-    info("updating partition " + partition.name + " for topic " + topic + " with " +
+    info("updating partition " + partitionId + " for topic " + topic + " with " +
             (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
-    ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString)
+    ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partitionId, offsets(0).toString)
 
     offsets(0)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Mar 21 17:29:32 2012
@@ -20,13 +20,12 @@ package kafka.consumer
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.message._
-import kafka.cluster._
 import kafka.utils.Logging
 import kafka.common.ErrorMapping
 
 private[consumer] class PartitionTopicInfo(val topic: String,
                                            val brokerId: Int,
-                                           val partition: Partition,
+                                           val partitionId: Int,
                                            private val chunkQueue: BlockingQueue[FetchedDataChunk],
                                            private val consumedOffset: AtomicLong,
                                            private val fetchedOffset: AtomicLong,
@@ -74,6 +73,6 @@ private[consumer] class PartitionTopicIn
     chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
   }
 
-  override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
+  override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Mar 21 17:29:32 2012
@@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerCo
   private val rebalanceLock = new Object
   private var fetcher: Option[Fetcher] = None
   private var zkClient: ZkClient = null
-  private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+  private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   // queues : (topic,consumerThreadId) -> queue
   private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
@@ -202,7 +202,7 @@ private[kafka] class ZookeeperConsumerCo
   }
 
   // this API is used by unit tests only
-  def getTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]] = topicRegistry
+  def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
@@ -241,7 +241,7 @@ private[kafka] class ZookeeperConsumerCo
       for (info <- infos.values) {
         val newOffset = info.getConsumeOffset
         try {
-          updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
+          updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId,
             newOffset.toString)
         } catch {
           case t: Throwable =>
@@ -261,7 +261,7 @@ private[kafka] class ZookeeperConsumerCo
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       for(partition <- infos.values) {
         builder.append("\n    {")
-        builder.append{partition.partition.name}
+        builder.append{partition}
         builder.append(",fetch offset:" + partition.getFetchOffset)
         builder.append(",consumer offset:" + partition.getConsumeOffset)
         builder.append("}")
@@ -278,10 +278,9 @@ private[kafka] class ZookeeperConsumerCo
     getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
 
   def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
-    val partition = new Partition(brokerId, partitionId)
     val partitionInfos = topicRegistry.get(topic)
     if (partitionInfos != null) {
-      val partitionInfo = partitionInfos.get(partition)
+      val partitionInfo = partitionInfos.get(partitionId)
       if (partitionInfo != null)
         return partitionInfo.getConsumeOffset
     }
@@ -289,7 +288,7 @@ private[kafka] class ZookeeperConsumerCo
     //otherwise, try to get it from zookeeper
     try {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-      val znode = topicDirs.consumerOffsetDir + "/" + partition.name
+      val znode = topicDirs.consumerOffsetDir + "/" + partitionId
       val offsetString = readDataMaybeNull(zkClient, znode)
       if (offsetString != null)
         return offsetString.toLong
@@ -383,7 +382,7 @@ private[kafka] class ZookeeperConsumerCo
       info("Releasing partition ownership")
       for ((topic, infos) <- topicRegistry) {
         for(partition <- infos.keys) {
-          val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.partId.toString)
+          val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString)
           deletePath(zkClient, partitionOwnerPath)
           debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath)
         }
@@ -475,7 +474,7 @@ private[kafka] class ZookeeperConsumerCo
       var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
       for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
         topicRegistry.remove(topic)
-        topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
+        topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
@@ -566,7 +565,7 @@ private[kafka] class ZookeeperConsumerCo
         for (partition <- partitionInfos.values)
           allPartitionInfos ::= partition
       info("Consumer " + consumerIdString + " selected partitions : " +
-        allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
+        allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
 
       fetcher match {
         case Some(f) =>
@@ -648,18 +647,17 @@ private[kafka] class ZookeeperConsumerCo
       else
         offset = offsetString.toLong
 
-      val partitionObject = new Partition(leader, partition.toInt, topic)
       val queue = queues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
                                                  leader,
-                                                 partitionObject,
+                                                 partition.toInt,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize))
-      partTopicInfoMap.put(partitionObject, partTopicInfo)
+      partTopicInfoMap.put(partition.toInt, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Wed Mar 21 17:29:32 2012
@@ -26,7 +26,6 @@ class ProducerRequest(val correlationId:
                       val ackTimeout: Int,
                       val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
 	
-  import Implicits._
   val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
 
   def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Mar 21 17:29:32 2012
@@ -26,7 +26,7 @@ import kafka.common._
 import kafka.api.OffsetRequest
 import java.util._
 
-private[log] object Log {
+private[kafka] object Log {
   val FileSuffix = ".kafka"
 
   /**
@@ -100,7 +100,7 @@ private[log] class LogSegment(val file: 
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
+private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Mar 21 17:29:32 2012
@@ -25,6 +25,7 @@ import kafka.server.{KafkaConfig, KafkaZ
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient.ZkClient
+import kafka.cluster.{Partition, Replica}
 
 /**
  * The guy who creates and hands out logs
@@ -51,6 +52,7 @@ private[kafka] class LogManager(val conf
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
   private val logRetentionSize = config.logRetentionSize
   private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
+  private var replicas: Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -120,7 +122,9 @@ private[kafka] class LogManager(val conf
       new Log(d, maxSize, flushInterval, false)
     }
   }
-  
+
+  def getReplicaForPartition(topic: String, partition: Int): Option[Replica] = replicas.get((topic, partition))
+
   /**
    * Return the Pool (partitions) for a specific log
    */
@@ -145,17 +149,23 @@ private[kafka] class LogManager(val conf
 
   def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
     val log = getLog(offsetRequest.topic, offsetRequest.partition)
-    if (log != null) return log.getOffsetsBefore(offsetRequest)
-    Log.getEmptyOffsets(offsetRequest)
+    log match {
+      case Some(l) => l.getOffsetsBefore(offsetRequest)
+      case None => Log.getEmptyOffsets(offsetRequest)
+    }
   }
 
   /**
    * Get the log if exists
    */
-  def getLog(topic: String, partition: Int): Log = {
+  def getLog(topic: String, partition: Int): Option[Log] = {
     val parts = getLogPool(topic, partition)
-    if (parts == null) return null
-    parts.get(partition)
+    if (parts == null) None
+    else {
+      val log = parts.get(partition)
+      if(log == null) None
+      else Some(log)
+    }
   }
 
   /**
@@ -188,9 +198,40 @@ private[kafka] class LogManager(val conf
         info("Created log for '" + topic + "'-" + partition)
     }
 
+    // add this log to the list of replicas hosted on this broker
+    addReplicaForPartition(topic, partition)
     log
   }
-  
+
+  def addReplicaForPartition(topic: String, partitionId: Int): Replica = {
+    val replica = replicas.get((topic, partitionId))
+    val log = getLog(topic, partitionId)
+    replica match {
+      case Some(r) =>
+        r.log match {
+          case None =>
+            val log = getLog(topic, partitionId)
+            r.log = log
+          case Some(l) => // nothing to do since log already exists
+        }
+      case None =>
+        val partition = new Partition(topic, partitionId)
+        log match {
+          case Some(l) =>
+            val replica = new Replica(config.brokerId, partition, topic, log, l.getHighwaterMark, l.maxSize, true)
+            replicas += (topic, partitionId) -> replica
+            info("Added replica for topic %s partition %s on broker %d"
+              .format(replica.topic, replica.partition.partId, replica.brokerId))
+          case None =>
+            val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
+            replicas += (topic, partitionId) -> replica
+            info("Added replica for topic %s partition %s on broker %d"
+              .format(replica.topic, replica.partition.partId, replica.brokerId))
+        }
+    }
+    replicas.get((topic, partitionId)).get
+  }
+
   /* Attemps to delete all provided segments from a log and returns how many it was able to */
   private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
     var total = 0

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Wed Mar 21 17:29:32 2012
@@ -16,12 +16,11 @@
 */
 package kafka.producer
 
-import kafka.cluster.{Broker, Partition}
 import collection.mutable.HashMap
 import kafka.api.{TopicMetadataRequest, TopicMetadata}
 import java.lang.IllegalStateException
-import kafka.common.NoLeaderForPartitionException
 import kafka.utils.Logging
+import kafka.cluster.{Replica, Partition}
 
 class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
   val topicPartitionInfo = new HashMap[String, TopicMetadata]()
@@ -33,29 +32,37 @@ class BrokerPartitionInfo(producerPool: 
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
    */  
-  def getBrokerPartitionInfo(topic: String): Seq[(Partition, Broker)] = {
+  def getBrokerPartitionInfo(topic: String): Seq[Partition] = {
+    debug("Getting broker partition info for topic %s".format(topic))
     // check if the cache has metadata for this topic
     val topicMetadata = topicPartitionInfo.get(topic)
     val metadata: TopicMetadata =
-    topicMetadata match {
-      case Some(m) => m
-      case None =>
-        // refresh the topic metadata cache
-        info("Fetching metadata for topic %s".format(topic))
-        updateInfo(topic)
-        val topicMetadata = topicPartitionInfo.get(topic)
-        topicMetadata match {
-          case Some(m) => m
-          case None => throw new IllegalStateException("Failed to fetch topic metadata for topic: " + topic)
-        }
-    }
+      topicMetadata match {
+        case Some(m) => m
+        case None =>
+          // refresh the topic metadata cache
+          info("Fetching metadata for topic %s".format(topic))
+          updateInfo(topic)
+          val topicMetadata = topicPartitionInfo.get(topic)
+          topicMetadata match {
+            case Some(m) => m
+            case None => throw new IllegalStateException("Failed to fetch topic metadata for topic: " + topic)
+          }
+      }
     val partitionMetadata = metadata.partitionsMetadata
     partitionMetadata.map { m =>
+      val partition = new Partition(topic, m.partitionId)
       m.leader match {
-        case Some(leader) => (new Partition(leader.id, m.partitionId, topic) -> leader)
-        case None =>  throw new NoLeaderForPartitionException("No leader for topic %s, partition %d".format(topic, m.partitionId))
+        case Some(leader) =>
+          val leaderReplica = new Replica(leader.id, partition, topic)
+          partition.leader = Some(leaderReplica)
+          debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
+          partition
+        case None =>
+          debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
+          partition
       }
-    }.sortWith((s, t) => s._1.partId < t._1.partId)
+    }.sortWith((s, t) => s.partId < t.partId)
   }
 
   /**
@@ -78,8 +85,8 @@ class BrokerPartitionInfo(producerPool: 
       // refresh cache for all topics
       val topics = topicPartitionInfo.keySet.toList
       val topicMetadata = producer.send(new TopicMetadataRequest(topics))
-      info("Fetched metadata for topics %s".format(topicMetadata.mkString(",")))
       topicMetadata.foreach(metadata => topicPartitionInfo += (metadata.topic -> metadata))
+
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Mar 21 17:29:32 2012
@@ -18,14 +18,14 @@
 package kafka.producer.async
 
 import kafka.api.{ProducerRequest, TopicData, PartitionData}
-import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException}
-import kafka.cluster.{Partition, Broker}
+import kafka.cluster.Partition
 import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
-import kafka.utils.{Utils, Logging}
 import scala.collection.Map
 import scala.collection.mutable.{ListBuffer, HashMap}
+import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
+import kafka.utils.{Utils, Logging}
 
 class DefaultEventHandler[K,V](config: ProducerConfig,                               // this api is for testing
                                private val partitioner: Partitioner[K],              // use the other constructor
@@ -43,41 +43,37 @@ class DefaultEventHandler[K,V](config: P
   def handle(events: Seq[ProducerData[K,V]]) {
     lock synchronized {
      val serializedData = serialize(events)
-     handleSerializedData(serializedData, config.producerRetries)
+      var outstandingProduceRequests = serializedData
+      var remainingRetries = config.producerRetries
+      Stream.continually(dispatchSerializedData(outstandingProduceRequests))
+                        .takeWhile(requests => (remainingRetries > 0) && (requests.size > 0)).foreach {
+        currentOutstandingRequests =>
+          outstandingProduceRequests = currentOutstandingRequests
+          // back off and update the topic metadata cache before attempting another send operation
+          Thread.sleep(config.producerRetryBackoffMs)
+          brokerPartitionInfo.updateInfo()
+          remainingRetries -= 1
+      }
     }
   }
 
-  private def handleSerializedData(messages: Seq[ProducerData[K,Message]], requiredRetries: Int) {
+  private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
     val partitionedData = partitionAndCollate(messages)
-    for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) {
-      if (logger.isTraceEnabled)
-        eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
-          .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-      val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
-
-      try {
-        send(brokerid, messageSetPerBroker)
-      } catch {
-        case t =>
-          warn("error sending data to broker " + brokerid, t)
-          var numRetries = 0
-          val eventsPerBroker = new ListBuffer[ProducerData[K,Message]]
-          eventsPerBrokerMap.foreach(e => eventsPerBroker.appendAll(e._2))
-          while (numRetries < requiredRetries) {
-            numRetries +=1
-            Thread.sleep(config.producerRetryBackoffMs)
-            try {
-              brokerPartitionInfo.updateInfo()
-              handleSerializedData(eventsPerBroker, 0)
-              return
-            }
-            catch {
-              case t => warn("error sending data to broker " + brokerid + " in " + numRetries + " retry", t)
-            }
-          }
-          throw new FailedToSendMessageException("can't send data after " + numRetries + " retries", t)
+    val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
+    try {
+      for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) {
+        if (logger.isTraceEnabled)
+          eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
+            .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+        val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+
+        if((brokerid < 0) || (!send(brokerid, messageSetPerBroker)))
+          failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten)
       }
+    }catch {
+      case t: Throwable => error("Failed to send messages")
     }
+    failedProduceRequests
   }
 
   def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
@@ -93,16 +89,22 @@ class DefaultEventHandler[K,V](config: P
       val partitionIndex = getPartition(event.getKey, totalNumPartitions)
       val brokerPartition = topicPartitionsList(partitionIndex)
 
+      val leaderBrokerId = brokerPartition.leader match {
+        case Some(leader) => leader.brokerId
+        case None => -1
+        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+      }
+
       var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
-      ret.get(brokerPartition._2.id) match {
+      ret.get(leaderBrokerId) match {
         case Some(element) =>
           dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
         case None =>
           dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
-          ret.put(brokerPartition._2.id, dataPerBroker)
+          ret.put(leaderBrokerId, dataPerBroker)
       }
 
-      val topicAndPartition = (event.getTopic, brokerPartition._1.partId)
+      val topicAndPartition = (event.getTopic, brokerPartition.partId)
       var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
       dataPerBroker.get(topicAndPartition) match {
         case Some(element) =>
@@ -116,10 +118,11 @@ class DefaultEventHandler[K,V](config: P
     ret
   }
 
-  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[(Partition, Broker)] = {
+  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
-    debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
+    debug("Broker partitions registered for topic: %s are %s"
+      .format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
     topicPartitionsList
@@ -150,23 +153,30 @@ class DefaultEventHandler[K,V](config: P
    * @param brokerId the broker that will receive the request
    * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
    */
-  private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) {
-    if(messagesPerTopic.size > 0) {
-      val topics = new HashMap[String, ListBuffer[PartitionData]]()
-      for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
-        topics.get(topicName) match {
-          case Some(x) => trace("found " + topicName)
-          case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic
+  private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Boolean = {
+    try {
+      if(brokerId < 0)
+        throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId))
+      if(messagesPerTopic.size > 0) {
+        val topics = new HashMap[String, ListBuffer[PartitionData]]()
+        for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
+          topics.get(topicName) match {
+            case Some(x) => trace("found " + topicName)
+            case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic
+          }
+          topics(topicName).append(new PartitionData(partitionId, messagesSet))
         }
-	      topics(topicName).append(new PartitionData(partitionId, messagesSet))
+        val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray))
+        val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray)
+        val syncProducer = producerPool.getProducer(brokerId)
+        val response = syncProducer.send(producerRequest)
+        // TODO: possibly send response to response callback handler
+        trace("kafka producer sent messages for topics %s to broker %d on %s:%d"
+          .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
       }
-      val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray))
-      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray)
-      val syncProducer = producerPool.getProducer(brokerId)
-      val response = syncProducer.send(producerRequest)
-      // TODO: possibly send response to response callback handler
-      trace("kafka producer sent messages for topics %s to broker %s:%d"
-        .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
+      true
+    }catch {
+      case t: Throwable => false
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Mar 21 17:29:32 2012
@@ -63,7 +63,11 @@ class ProducerSendThread[K,V](val thread
         // returns a null object
         val expired = currentQueueItem == null
         if(currentQueueItem != null) {
-          trace("Dequeued item for topic %s, partition key: %s, data: %s"
+          if(currentQueueItem.getKey == null)
+            trace("Dequeued item for topic %s, no partition key, data: %s"
+              .format(currentQueueItem.getTopic, currentQueueItem.getData.toString))
+          else
+            trace("Dequeued item for topic %s, partition key: %s, data: %s"
               .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
           events += currentQueueItem
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Mar 21 17:29:32 2012
@@ -130,7 +130,7 @@ class KafkaApis(val logManager: LogManag
     try {
       trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
       val log = logManager.getLog(topic, partition)
-      response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty)
+      response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
     } catch {
       case e =>
         error("error when processing request " + (topic, partition, offset, maxSize), e)
@@ -168,7 +168,7 @@ class KafkaApis(val logManager: LogManag
           if(config.autoCreateTopics) {
             CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
               config.defaultReplicationFactor)
-            info("Auto creation of topic %s with partitions %d and replication factor %d is successful!"
+            info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
               .format(topic, config.numPartitions, config.defaultReplicationFactor))
             val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
             newTopicMetadata match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Mar 21 17:29:32 2012
@@ -94,6 +94,15 @@ class KafkaConfig(props: Properties) ext
   /* enable auto creation of topic on the server */
   val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
 
+  /**
+   * Following properties are relevant to Kafka replication
+   */
+
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
-}
+
+  /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
+  * leader election on all replicas minus the preferred replica */
+  val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
+
+ }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Wed Mar 21 17:29:32 2012
@@ -20,9 +20,9 @@ package kafka.server
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.io.File
-import kafka.utils.{Mx4jLoader, Utils, SystemTime, Logging}
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
+import kafka.utils._
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -72,6 +72,11 @@ class KafkaServer(val config: KafkaConfi
      *  So this should happen after socket server start.
      */
     logManager.startup
+
+    // starting relevant replicas and leader election for partitions assigned to this broker
+    // TODO: Some part of the broker startup logic is hidden inside KafkaZookeeper, but some of it has to be done here
+    // since it requires the log manager to come up. Ideally log manager should not hide KafkaZookeeper inside it
+    logManager.kafkaZookeeper.startReplicasForTopics(ZkUtils.getAllTopics(logManager.getZookeeperClient))
     info("Server started.")
   }
   
@@ -82,7 +87,7 @@ class KafkaServer(val config: KafkaConfi
   def shutdown() {
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
-      info("Shutting down Kafka server")
+      info("Shutting down Kafka server with id " + config.brokerId)
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)
@@ -108,3 +113,5 @@ class KafkaServer(val config: KafkaConfi
 
   def getStats(): SocketServerStats = socketServer.stats
 }
+
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Wed Mar 21 17:29:32 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -18,30 +18,37 @@
 package kafka.server
 
 import kafka.utils._
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.log.LogManager
 import java.net.InetAddress
 import kafka.common.KafkaZookeeperClient
+import kafka.cluster.Replica
+import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
  *   /topics/[topic]/[node_id-partition_num]
  *   /brokers/[0...N] --> host:port
- * 
+ *
  */
 class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Logging {
-  
+
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   var zkClient: ZkClient = null
   var topics: List[String] = Nil
   val lock = new Object()
-  
+  var existingTopics: Set[String] = Set.empty[String]
+  val leaderChangeListener = new LeaderChangeListener
+  val topicPartitionsChangeListener = new TopicChangeListener
+  private val topicListenerLock = new Object
+  private val leaderChangeLock = new Object
+
   def startup() {
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     zkClient.subscribeStateChanges(new SessionExpireListener)
+    subscribeToTopicAndPartitionsChanges
   }
 
   def registerBrokerInZk() {
@@ -73,6 +80,12 @@ class KafkaZooKeeper(config: KafkaConfig
       info("re-registering broker info in ZK for broker " + config.brokerId)
       registerBrokerInZk()
       info("done re-registering broker")
+      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+      zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
+      val topics = ZkUtils.getAllTopics(zkClient)
+      debug("Existing topics are %s".format(topics.mkString(",")))
+      topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
+      handleNewTopics(topics)
     }
   }
 
@@ -81,6 +94,169 @@ class KafkaZooKeeper(config: KafkaConfig
       info("Closing zookeeper client...")
       zkClient.close()
     }
-  } 
-  
+  }
+
+  def handleNewTopics(topics: Seq[String]) {
+    // get relevant partitions to this broker
+    val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+    topicsAndPartitionsOnThisBroker.foreach { tp =>
+      val topic = tp._1
+      val partitionsAssignedToThisBroker = tp._2
+      // subscribe to leader changes for these partitions
+      subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
+      // start replicas for these partitions
+      startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
+    }
+  }
+
+  def handleNewPartitions(topic: String, partitions: Seq[Int]) {
+    info("Handling topic %s partitions %s".format(topic, partitions.mkString(",")))
+    // find the partitions relevant to this broker
+    val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topic, partitions, config.brokerId)
+    info("Partitions assigned to broker %d for topic %s are %s"
+      .format(config.brokerId, topic, partitionsAssignedToThisBroker.mkString(",")))
+
+    // subscribe to leader changes for these partitions
+    subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
+    // start replicas for these partitions
+    startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
+  }
+
+  def subscribeToTopicAndPartitionsChanges {
+    info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
+    val topics = ZkUtils.getAllTopics(zkClient)
+    debug("Existing topics are %s".format(topics.mkString(",")))
+    topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
+
+    val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+    debug("Partitions assigned to broker %d are %s".format(config.brokerId, partitionsAssignedToThisBroker.mkString(",")))
+    partitionsAssignedToThisBroker.foreach { tp =>
+      val topic = tp._1
+      val partitions = tp._2.map(p => p.toInt)
+      partitions.foreach { partition =>
+          // register leader change listener
+          zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
+      }
+    }
+  }
+
+  private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) {
+    partitions.foreach { partition =>
+      info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition))
+      // register leader change listener
+      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
+    }
+  }
+
+  def startReplicasForTopics(topics: Seq[String]) {
+    val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+    partitionsAssignedToThisBroker.foreach(tp => startReplicasForPartitions(tp._1, tp._2))
+  }
+
+  private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
+    partitions.foreach { partition =>
+      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+      if(assignedReplicas.contains(config.brokerId)) {
+        val replica = logManager.addReplicaForPartition(topic, partition)
+        startReplica(replica)
+      } else
+        warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
+          .format(partition, topic, config.brokerId))
+    }
+  }
+
+  private def startReplica(replica: Replica) {
+    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId))
+    replica.log match {
+      case Some(log) =>  // log is already started
+      case None =>
+      // TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46
+    }
+    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match {
+      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
+      case None => // leader election
+        leaderElection(replica)
+
+    }
+  }
+
+  def leaderElection(replica: Replica) {
+    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId))
+    // read the AR list for replica.partition from ZK
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt)
+    // TODO: read the ISR as part of KAFKA-302
+    if(assignedReplicas.contains(replica.brokerId)) {
+      // wait for some time if it is not the preferred replica
+      try {
+        if(replica.brokerId != assignedReplicas.head)
+          Thread.sleep(config.preferredReplicaWaitTime)
+      }catch {
+        case e => // ignoring
+      }
+      if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) {
+        info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
+        // TODO: Become leader as part of KAFKA-302
+      }
+    }
+  }
+
+  class TopicChangeListener extends IZkChildListener with Logging {
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      topicListenerLock.synchronized {
+        debug("Topic/partition change listener fired for path " + parentPath)
+        import scala.collection.JavaConversions._
+        val currentChildren = asBuffer(curChilds)
+        // check if topic has changed or a partition for an existing topic has changed
+        if(parentPath == ZkUtils.BrokerTopicsPath) {
+          val currentTopics = currentChildren
+          debug("New topics " + currentTopics.mkString(","))
+          // for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
+          currentTopics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this))
+          handleNewTopics(currentTopics)
+        }else {
+          val topic = parentPath.split("/").takeRight(2).head
+          debug("Partitions changed for topic %s on broker %d with new value %s"
+            .format(topic, config.brokerId, currentChildren.mkString(",")))
+          handleNewPartitions(topic, currentChildren.map(p => p.toInt).toSeq)
+        }
+      }
+    }
+  }
+
+  class LeaderChangeListener extends IZkDataListener with Logging {
+
+    @throws(classOf[Exception])
+    def handleDataChange(dataPath: String, data: Object) {
+      // handle leader change event for path
+      val newLeader: String = data.asInstanceOf[String]
+      debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader))
+      // TODO: update the leader in the list of replicas maintained by the log manager
+    }
+
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath: String) {
+      leaderChangeLock.synchronized {
+        // leader is deleted for topic partition
+        val topic = dataPath.split("/").takeRight(4).head
+        val partitionId = dataPath.split("/").takeRight(2).head.toInt
+        debug("Leader deleted listener fired for topic %s partition %d on broker %d"
+          .format(topic, partitionId, config.brokerId))
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
+        if(assignedReplicas.contains(config.brokerId)) {
+          val replica = logManager.getReplicaForPartition(topic, partitionId)
+          replica match {
+            case Some(r) => leaderElection(r)
+            case None =>  error("No replica exists for topic %s partition %s on broker %d"
+              .format(topic, partitionId, config.brokerId))
+          }
+        }
+      }
+    }
+  }
 }
+
+
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Mar 21 17:29:32 2012
@@ -246,6 +246,22 @@ object Utils extends Logging {
     else value
   }
 
+  def getLong(props: Properties, name: String, default: Long): Long =
+    getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue))
+
+  def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
+    val v =
+      if(props.containsKey(name))
+        props.getProperty(name).toInt
+      else
+        default
+    if(v < range._1 || v > range._2)
+      throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+    else
+      v
+  }
+
+
   def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
     val value = buffer.getLong
     if(value < range._1 || value > range._2)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Wed Mar 21 17:29:32 2012
@@ -17,13 +17,14 @@
 
 package kafka.utils
 
-import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import kafka.cluster.{Broker, Cluster}
 import scala.collection._
 import java.util.Properties
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import kafka.consumer.TopicCount
+import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
+import java.util.concurrent.locks.Condition
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -68,15 +69,9 @@ object ZkUtils extends Logging {
   }
 
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
-    // TODO: When leader election is implemented, change this method to return the leader as follows
-    // until then, assume the first replica as the leader
-//    val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
-    val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
-    val replicas = Utils.getCSVList(replicaListString)
-    replicas.size match {
-      case 0 => None
-      case _ => Some(replicas.head.toInt)
-    }
+    val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+    if(leader == null) None
+    else Some(leader.toInt)
   }
 
   def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
@@ -94,6 +89,16 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
+  def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
+    try {
+      createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
+      true
+    } catch {
+      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false
+      case oe => false
+    }
+  }
+
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
@@ -317,6 +322,27 @@ object ZkUtils extends Logging {
     ret
   }
 
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
+    val topicsAndPartitions = getPartitionsForTopics(zkClient, topics.iterator)
+
+    topicsAndPartitions.map { tp =>
+      val topic = tp._1
+      val partitions = tp._2.map(p => p.toInt)
+      val relevantPartitions = partitions.filter { partition =>
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+        assignedReplicas.contains(brokerId)
+      }
+      (topic -> relevantPartitions)
+    }
+  }
+
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topic: String, partitions: Seq[Int], broker: Int): Seq[Int] = {
+    partitions.filter { p =>
+      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, p).map(r => r.toInt)
+      assignedReplicas.contains(broker)
+    }
+  }
+
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
     zkClient.delete(brokerIdPath)
@@ -372,6 +398,29 @@ object ZkUtils extends Logging {
 
   def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
     brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
+
+  def getAllTopics(zkClient: ZkClient): Seq[String] = {
+    val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+    if(topics == null) Seq.empty[String]
+    else topics
+  }
+
+}
+
+class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener {
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    val t = dataPath.split("/").takeRight(3).head
+    val p = dataPath.split("/").takeRight(2).head.toInt
+    if(t == topic && p == partition)
+      leaderExists.signal()
+  }
+
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+    leaderExists.signal()
+  }
+
 }
 
 object ZKStringSerializer extends ZkSerializer {

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Wed Mar 21 17:29:32 2012
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.C
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=OFF
+log4j.logger.kafka=INFO
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
-log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
+log4j.logger.org.apache.zookeeper=OFF

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Wed Mar 21 17:29:32 2012
@@ -169,8 +169,6 @@ class AdminTest extends JUnit3Suite with
       case Some(metadata) => assertEquals(topic, metadata.topic)
         assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
         assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
-        assertEquals("leader of partition 0 should be 0", 0, metadata.partitionsMetadata.head.leader.get.id)
-        assertEquals("leader of partition 1 should be 1", 1, metadata.partitionsMetadata.last.leader.get.id)
         val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
         val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
         assertEquals(expectedReplicaAssignment.toList, actualReplicaList)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala Wed Mar 21 17:29:32 2012
@@ -43,7 +43,7 @@ class TopicCountTest extends JUnitSuite 
 
   @Test
   def testPartition() {
-    assertTrue(new Partition(10, 0) == new Partition(10, 0))
-    assertTrue(new Partition(10, 1) != new Partition(10, 0))
+    assertTrue(new Partition("foo", 10) == new Partition("foo", 10))
+    assertTrue(new Partition("foo", 1) != new Partition("foo", 0))
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Wed Mar 21 17:29:32 2012
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.producer.{ProducerConfig, ProducerData, Producer}
 import java.util.{Collections, Properties}
+import kafka.utils.TestUtils._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -51,10 +52,12 @@ class ZookeeperConsumerConnectorTest ext
   val consumer2 = "consumer2"
   val consumer3 = "consumer3"
   val nMessages = 2
+  var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
     dirs = new ZKGroupTopicDirs(group, topic)
+    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
   }
 
   def testBasic() {
@@ -94,6 +97,10 @@ class ZookeeperConsumerConnectorTest ext
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1.size, receivedMessages1.size)
     assertEquals(sentMessages1, receivedMessages1)
@@ -102,7 +109,6 @@ class ZookeeperConsumerConnectorTest ext
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_1 = List( ("0", "group1_consumer1-0"),
                            ("1", "group1_consumer1-0"))
-//   assertEquals(expected_1, actual_1)
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -118,7 +124,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    Thread.sleep(200)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -141,7 +148,10 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
-    Thread.sleep(200)
+
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
     val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
@@ -168,6 +178,9 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -196,7 +209,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    Thread.sleep(200)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -219,7 +233,10 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
-    Thread.sleep(200)
+
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
     val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
@@ -300,11 +317,15 @@ class ZookeeperConsumerConnectorTest ext
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+
     val zkConsumerConnector =
       new ZookeeperConsumerConnector(consumerConfig, true)
     val topicMessageStreams =
       zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder)
 
+
     var receivedMessages: List[String] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
@@ -341,10 +362,10 @@ class ZookeeperConsumerConnectorTest ext
     val topicRegistry = zkConsumerConnector1.getTopicRegistry
     assertEquals(1, topicRegistry.map(r => r._1).size)
     assertEquals(topic, topicRegistry.map(r => r._1).head)
-    val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._1)))
+    val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
     val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
     assertEquals(0, brokerPartition.brokerId)
-    assertEquals(0, brokerPartition.partId)
+    assertEquals(0, brokerPartition.partitionId)
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Wed Mar 21 17:29:32 2012
@@ -27,8 +27,9 @@ import kafka.message._
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
-import kafka.utils.TestUtils
 import kafka.producer.{ProducerData, Producer}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -43,7 +44,7 @@ class FetcherTest extends JUnit3Suite wi
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
                                                       c.brokerId,
-                                                      new Partition(c.brokerId, 0), 
+                                                      0,
                                                       queue, 
                                                       new AtomicLong(0), 
                                                       new AtomicLong(0), 
@@ -66,6 +67,7 @@ class FetcherTest extends JUnit3Suite wi
   def testFetcher() {
     val perNode = 2
     var count = sendMessages(perNode)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
     fetch(count)
     Thread.sleep(100)
     assertQueueEmpty()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Wed Mar 21 17:29:32 2012
@@ -21,11 +21,11 @@ import kafka.api.FetchRequestBuilder
 import kafka.common.OffsetOutOfRangeException
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.producer.ProducerData
+import kafka.utils.TestUtils
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -61,6 +61,7 @@ class LazyInitProducerTest extends JUnit
     val producerData = new ProducerData[String, Message](topic, topic, sentMessages)
 
     producer.send(producerData)
+
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala Wed Mar 21 17:29:32 2012
@@ -39,6 +39,8 @@ trait ProducerConsumerTestHarness extend
       props.put("buffer.size", "65536")
       props.put("connect.timeout.ms", "100000")
       props.put("reconnect.interval", "10000")
+      props.put("producer.retry.backoff.ms", "1000")
+      props.put("producer.num.retries", "3")
       producer = new Producer(new ProducerConfig(props))
       consumer = new SimpleConsumer(host,
                                    port,

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Wed Mar 21 17:29:32 2012
@@ -20,14 +20,14 @@ package kafka.javaapi.consumer
 import junit.framework.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
-import kafka.utils.{Utils, Logging}
-import kafka.utils.TestUtils
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
 import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.javaapi.producer.{ProducerData, Producer}
+import kafka.utils.{Utils, Logging, TestUtils}
+import kafka.utils.TestUtils._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -52,11 +52,15 @@ class ZookeeperConsumerConnectorTest ext
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
+
+    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+
     // create a consumer
-    val consumerConfig1 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Wed Mar 21 17:29:32 2012
@@ -26,7 +26,7 @@ import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness}
+import kafka.zk.ZooKeeperTestHarness
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
@@ -36,22 +36,16 @@ import kafka.utils._
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDirZk: File = null
-  var logDirBl: File = null
-  var serverBl: KafkaServer = null
   var serverZk: KafkaServer = null
 
   var simpleConsumerZk: SimpleConsumer = null
-  var simpleConsumerBl: SimpleConsumer = null
 
   val tLogger = Logger.getLogger(getClass())
 
   private val brokerZk = 0
-  private val brokerBl = 1
 
   private val ports = TestUtils.choosePorts(2)
-  private val (portZk, portBl) = (ports(0), ports(1))
-
-  private var zkServer:EmbeddedZookeeper = null
+  private val portZk = ports(0)
 
   @Before
   override def setUp() {
@@ -62,26 +56,17 @@ class KafkaLog4jAppenderTest extends JUn
     logDirZk = new File(logDirZkPath)
     serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
 
-    val propsBl: Properties = TestUtils.createBrokerConfig(brokerBl, portBl)
-    val logDirBlPath = propsBl.getProperty("log.dir")
-    logDirBl = new File(logDirBlPath)
-    serverBl = TestUtils.createServer(new KafkaConfig(propsBl))
-
     Thread.sleep(100)
 
     simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
-    simpleConsumerBl = new SimpleConsumer("localhost", portBl, 1000000, 64*1024)
   }
 
   @After
   override def tearDown() {
     simpleConsumerZk.close
-    simpleConsumerBl.close
 
     serverZk.shutdown
-    serverBl.shutdown
     Utils.rm(logDirZk)
-    Utils.rm(logDirBl)
 
     Thread.sleep(500)
     super.tearDown()
@@ -174,13 +159,6 @@ class KafkaLog4jAppenderTest extends JUn
       count = count + 1
     }
 
-    val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
-    val messagesFromOtherBroker = response2.messageSet("test-topic", 0)
-
-    for(message <- messagesFromOtherBroker) {
-      count = count + 1
-    }
-
     assertEquals(5, count)
   }
 
@@ -195,11 +173,6 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
     props
   }
-
-  private def getLogDir(): File = {
-    val dir = TestUtils.tempDir()
-    dir
-  }
 }
 
 class AppenderStringEncoder extends Encoder[LoggingEvent] {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Mar 21 17:29:32 2012
@@ -124,10 +124,58 @@ class ProducerTest extends JUnit3Suite w
     producer.close
   }
 
+//  @Test
+//  def testZKSendWithDeadBroker() {
+//    val props = new Properties()
+//    props.put("serializer.class", "kafka.serializer.StringEncoder")
+//    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+//    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+//
+//    // create topic
+//    CreateTopicCommand.createTopic(zkClient, "new-topic", 2, 1, "0,0")
+//
+//    val config = new ProducerConfig(props)
+//
+//    val producer = new Producer[String, String](config)
+//    val message = new Message("test1".getBytes)
+//    try {
+////      // kill 2nd broker
+////      server1.shutdown
+////      Thread.sleep(100)
+//
+//      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
+//      // all partitions have broker 0 as the leader.
+//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+//      Thread.sleep(100)
+//
+//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+//      Thread.sleep(3000)
+//
+//      // restart server 1
+////      server1.startup()
+////      Thread.sleep(100)
+//
+//      // cross check if brokers got the messages
+//      val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+//      val messageSet = response.messageSet("new-topic", 0).iterator
+//      var numMessagesReceived = 0
+//      while(messageSet.hasNext) {
+//        val messageAndOffset = messageSet.next()
+//        assertEquals(message, messageSet.next.message)
+//        println("Received message at offset %d".format(messageAndOffset.offset))
+//        numMessagesReceived += 1
+//      }
+//      assertEquals("Message set should have 2 messages", 2, numMessagesReceived)
+//    } catch {
+//      case e: Exception => fail("Not expected", e)
+//    }
+//    producer.close
+//  }
+
   // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions
   //       and when leader logic is changed.
-  @Test
-  def testZKSendWithDeadBroker() {
+//  @Test
+//  def testZKSendWithDeadBroker2() {
 //    val props = new Properties()
 //    props.put("serializer.class", "kafka.serializer.StringEncoder")
 //    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
@@ -172,7 +220,7 @@ class ProducerTest extends JUnit3Suite w
 //      case e: Exception => fail("Not expected", e)
 //    }
 //    producer.close
-  }
+//  }
 
   @Test
   def testZKSendToExistingTopicWithNoBrokers() {
@@ -227,7 +275,7 @@ class ProducerTest extends JUnit3Suite w
     } catch {
       case e: Exception => fail("Not expected", e)
     } finally {
-      server.shutdown
+      if(server != null) server.shutdown
       producer.close
     }
   }



Mime
View raw message