kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1295861 [1/3] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/...
Date Thu, 01 Mar 2012 21:15:28 GMT
Author: nehanarkhede
Date: Thu Mar  1 21:15:26 2012
New Revision: 1295861

URL: http://svn.apache.org/viewvc?rev=1295861&view=rev
Log:
KAFKA-239 Refactoring code to wire new ZK data structures and making partitions logical; patched by Neha Narkhede; reviewed by Jun Rao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Thu Mar  1 21:15:26 2012
@@ -80,6 +80,10 @@ object AdminUtils extends Logging {
       for (i <- 0 until replicaAssignmentList.size) {
         val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
         ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
+        // TODO: Remove this with leader election patch
+        // assign leader for the partition i
+//        ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, i.toString),
+//          replicaAssignmentList(i).head)
         debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
       }
     }
@@ -103,18 +107,19 @@ object AdminUtils extends Logging {
         for (i <-0 until partitionMetadata.size) {
           val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
           val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
-          val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitions(i).toString))
+          val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
           debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
           partitionMetadata(i) = new PartitionMetadata(partitions(i),
-            if (leader == null) None else Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(leader.toInt)).head),
+            leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
             getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
             getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
             None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
         }
         Some(new TopicMetadata(topic, partitionMetadata))
-      } else
+      } else {
         None
+      }
     }
 
     metadataList.toList

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Thu Mar  1 21:15:26 2012
@@ -18,10 +18,10 @@
 package kafka.admin
 
 import joptsimple.OptionParser
-import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
 
-object CreateTopicCommand {
+object CreateTopicCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser
@@ -91,6 +91,7 @@ object CreateTopicCommand {
       replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
     else
       replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+    debug("Replica assignment list for %s is %s".format(topic, replicaAssignment))
     AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
   }
 
@@ -104,8 +105,8 @@ object CreateTopicCommand {
       if (brokerList.size != brokerList.toSet.size)
         throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
       if (!brokerList.toSet.subsetOf(availableBrokerList))
-        throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList +
-                "available broker:" + availableBrokerList)
+        throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
+                "available broker:" + availableBrokerList.toString)
       ret(i) = brokerList.toList
       if (ret(i).size != ret(0).size)
         throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Thu Mar  1 21:15:26 2012
@@ -148,8 +148,8 @@ object PartitionMetadata {
   }
 }
 
-case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker],
-                             logMetadata: Option[LogMetadata]) {
+case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
+                             logMetadata: Option[LogMetadata] = None) {
   def sizeInBytes: Int = {
     var size: Int = 4 /* partition id */ + 1 /* if leader exists*/
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Thu Mar  1 21:15:26 2012
@@ -72,6 +72,7 @@ object TopicMetadataRequest {
   def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
     val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
     val buffer = ByteBuffer.allocate(size)
+    debug("Allocating buffer of size %d for topic metadata response".format(size))
     /* number of topics */
     buffer.putInt(topicMetadata.size)
     /* topic partition_metadata */
@@ -122,13 +123,16 @@ case class TopicMetadataRequest(val topi
 }
 
 class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
-  private var size: Int = topicsMetadata.foldLeft(0)(_ + _.sizeInBytes)
+  private var size: Int = topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
   private val header = ByteBuffer.allocate(6)
-  val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
   header.putInt(size + 2)
   header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
   header.rewind()
 
+  val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
+  metadata.rewind()
+
+  trace("Wrote size %d in header".format(size + 2))
   var complete: Boolean = false
 
   def writeTo(channel: GatheringByteChannel): Int = {
@@ -136,9 +140,13 @@ class TopicMetadataSend(topicsMetadata: 
     var written = 0
     if(header.hasRemaining)
       written += channel.write(header)
+    trace("Wrote %d bytes for header".format(written))
+
     if(!header.hasRemaining && metadata.hasRemaining)
       written += channel.write(metadata)
 
+    trace("Wrote %d bytes for header and metadata".format(written))
+
     if(!metadata.hasRemaining)
       complete = true
     written

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Thu Mar  1 21:15:26 2012
@@ -17,41 +17,14 @@
 
 package kafka.cluster
 
-object Partition {
-  def parse(s: String): Partition = {
-    val pieces = s.split("-")
-    if(pieces.length != 2)
-      throw new IllegalArgumentException("Expected name in the form x-y.")
-    new Partition(pieces(0).toInt, pieces(1).toInt)
-  }
-}
-
-class Partition(val brokerId: Int, val partId: Int) extends Ordered[Partition] {
+case class Partition(val brokerId: Int, val partId: Int, val topic: String = "") extends Ordered[Partition] {
 
-  def this(name: String) = {
-    this(1, 1)
-  }
-  
-  def name = brokerId + "-" + partId
-  
-  override def toString(): String = name
+  def name = partId
 
   def compare(that: Partition) =
-    if (this.brokerId == that.brokerId)
+    if (this.topic == that.topic)
       this.partId - that.partId
     else
-      this.brokerId - that.brokerId
-
-  override def equals(other: Any): Boolean = {
-    other match {
-      case that: Partition =>
-        (that canEqual this) && brokerId == that.brokerId && partId == that.partId
-      case _ => false
-    }
-  }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[Partition]
-
-  override def hashCode: Int = 31 * (17 + brokerId) + partId
+      this.topic.compareTo(that.topic)
 
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala?rev=1295861&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala Thu Mar  1 21:15:26 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for partition, but no leader exists for that partition
+ */
+class NoLeaderForPartitionException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Mar  1 21:15:26 2012
@@ -22,6 +22,7 @@ import java.nio.channels._
 import kafka.api._
 import kafka.network._
 import kafka.utils._
+import kafka.utils.Utils._
 
 /**
  * A consumer of kafka messages
@@ -77,16 +78,16 @@ class SimpleConsumer(val host: String,
       getOrMakeConnection()
       var response: Tuple2[Receive,Int] = null
       try {
-        sendRequest(request)
-        response = getResponse
+        sendRequest(request, channel)
+        response = getResponse(channel)
       } catch {
         case e : java.io.IOException =>
           info("Reconnect in fetch request due to socket error: ", e)
           // retry once
           try {
             channel = connect
-            sendRequest(request)
-            response = getResponse
+            sendRequest(request, channel)
+            response = getResponse(channel)
           } catch {
             case ioe: java.io.IOException => channel = null; throw ioe;
           }
@@ -115,16 +116,16 @@ class SimpleConsumer(val host: String,
       getOrMakeConnection()
       var response: Tuple2[Receive,Int] = null
       try {
-        sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
-        response = getResponse
+        sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel)
+        response = getResponse(channel)
       } catch {
         case e : java.io.IOException =>
           info("Reconnect in get offetset request due to socket error: ", e)
           // retry once
           try {
             channel = connect
-            sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
-            response = getResponse
+            sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel)
+            response = getResponse(channel)
           } catch {
             case ioe: java.io.IOException => channel = null; throw ioe;
           }
@@ -133,20 +134,6 @@ class SimpleConsumer(val host: String,
     }
   }
 
-  private def sendRequest(request: Request) = {
-    val send = new BoundedByteBufferSend(request)
-    send.writeCompletely(channel)
-  }
-
-  private def getResponse(): Tuple2[Receive,Int] = {
-    val response = new BoundedByteBufferReceive()
-    response.readCompletely(channel)
-
-    // this has the side effect of setting the initial position of buffer correctly
-    val errorCode: Int = response.buffer.getShort
-    (response, errorCode)
-  }
-
   private def getOrMakeConnection() {
     if(channel == null) {
       channel = connect()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Mar  1 21:15:26 2012
@@ -29,9 +29,9 @@ import org.apache.zookeeper.Watcher.Even
 import kafka.api.OffsetRequest
 import java.util.UUID
 import kafka.serializer.Decoder
-import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
 import java.lang.IllegalStateException
 import kafka.utils.ZkUtils._
+import kafka.common.{NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -201,6 +201,9 @@ private[kafka] class ZookeeperConsumerCo
     ret
   }
 
+  // this API is used by unit tests only
+  def getTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]] = topicRegistry
+
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
     createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
@@ -368,7 +371,7 @@ private[kafka] class ZookeeperConsumerCo
                                 kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
     extends IZkChildListener {
     private val dirs = new ZKGroupDirs(group)
-    private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+    private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]()
     private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
 
     @throws(classOf[Exception])
@@ -379,18 +382,17 @@ private[kafka] class ZookeeperConsumerCo
     private def releasePartitionOwnership()= {
       info("Releasing partition ownership")
       for ((topic, infos) <- topicRegistry) {
-        val topicDirs = new ZKGroupTopicDirs(group, topic)
         for(partition <- infos.keys) {
-          val znode = topicDirs.consumerOwnerDir + "/" + partition
-          deletePath(zkClient, znode)
-          debug("Consumer " + consumerIdString + " releasing " + znode)
+          val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.partId.toString)
+          deletePath(zkClient, partitionOwnerPath)
+          debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath)
         }
       }
     }
 
     private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
-                                    newPartMap: Map[String,List[String]],
-                                    oldPartMap: Map[String,List[String]],
+                                    newPartMap: Map[String, Seq[String]],
+                                    oldPartMap: Map[String, Seq[String]],
                                     newConsumerMap: Map[String,List[String]],
                                     oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
       var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
@@ -477,7 +479,7 @@ private[kafka] class ZookeeperConsumerCo
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
-        var curPartitions: List[String] = partitionsPerTopicMap.get(topic).get
+        var curPartitions: Seq[String] = partitionsPerTopicMap.get(topic).get
 
         val nPartsPerConsumer = curPartitions.size / curConsumers.size
         val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
@@ -599,8 +601,7 @@ private[kafka] class ZookeeperConsumerCo
         val topic = partitionOwner._1._1
         val partition = partitionOwner._1._2
         val consumerThreadId = partitionOwner._2
-        val topicDirs = new ZKGroupTopicDirs(group, topic)
-        val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+        val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic,partition)
         try {
           createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
@@ -618,37 +619,47 @@ private[kafka] class ZookeeperConsumerCo
       else true
     }
 
-    private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
+    private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partition: String,
                                       topic: String, consumerThreadId: String) {
-      val partition = Partition.parse(partitionString)
       val partTopicInfoMap = topicRegistry.get(topic)
 
-      val znode = topicDirs.consumerOffsetDir + "/" + partition.name
+      // find the leader for this partition
+      val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt)
+      leaderOpt match {
+        case None => throw new NoBrokersForPartitionException("No leader available for partition %s on topic %s".
+          format(partition, topic))
+        case Some(l) => debug("Leader for partition %s for topic %s is %d".format(partition, topic, l))
+      }
+      val leader = leaderOpt.get
+
+      val znode = topicDirs.consumerOffsetDir + "/" + partition
       val offsetString = readDataMaybeNull(zkClient, znode)
       // If first time starting a consumer, set the initial offset based on the config
       var offset : Long = 0L
       if (offsetString == null)
         offset = config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                  earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.EarliestTime)
+                  earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.EarliestTime)
               case OffsetRequest.LargestTimeString =>
-                  earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.LatestTime)
+                  earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.LatestTime)
               case _ =>
                   throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
         }
       else
         offset = offsetString.toLong
+
+      val partitionObject = new Partition(leader, partition.toInt, topic)
       val queue = queues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
-                                                 partition.brokerId,
-                                                 partition,
+                                                 leader,
+                                                 partitionObject,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize))
-      partTopicInfoMap.put(partition, partTopicInfo)
+      partTopicInfoMap.put(partitionObject, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Mar  1 21:15:26 2012
@@ -19,7 +19,6 @@ package kafka.log
 
 import java.io._
 import kafka.utils._
-import scala.actors.Actor
 import scala.collection._
 import java.util.concurrent.CountDownLatch
 import kafka.server.{KafkaConfig, KafkaZooKeeper}
@@ -32,7 +31,6 @@ import org.I0Itec.zkclient.ZkClient
  */
 @threadsafe
 private[kafka] class LogManager(val config: KafkaConfig,
-                                private val scheduler: KafkaScheduler,
                                 private val time: Time,
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
@@ -47,12 +45,12 @@ private[kafka] class LogManager(val conf
   private val topicPartitionsMap = config.topicPartitionsMap
   private val logCreationLock = new Object
   private val random = new java.util.Random
-  private var zkActor: Actor = null
   private val startupLatch: CountDownLatch = new CountDownLatch(1)
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
   private val logRetentionSize = config.logRetentionSize
+  private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -78,35 +76,6 @@ private[kafka] class LogManager(val conf
     }
   }
   
-  /* Schedule the cleanup task to delete old logs */
-  if(scheduler != null) {
-    info("starting log cleaner every " + logCleanupIntervalMs + " ms")    
-    scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
-  }
-
-  kafkaZookeeper.startup
-  zkActor = new Actor {
-    def act() {
-      loop {
-        receive {
-          case topic: String =>
-            try {
-              kafkaZookeeper.registerTopicInZk(topic)
-            }
-            catch {
-              case e => error(e) // log it and let it go
-            }
-          case StopActor =>
-            info("zkActor stopped")
-            exit
-        }
-      }
-    }
-  }
-  zkActor.start
-
-  case object StopActor
-
   private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
     var ret = new mutable.HashMap[String, Long]
     for ( (topic, hour) <- logRetentionHourMap )
@@ -118,22 +87,29 @@ private[kafka] class LogManager(val conf
    *  Register this broker in ZK for the first time.
    */
   def startup() {
+    kafkaZookeeper.startup
     kafkaZookeeper.registerBrokerInZk()
-    for (topic <- getAllTopics)
-      kafkaZookeeper.registerTopicInZk(topic)
-    startupLatch.countDown
+
+    /* Schedule the cleanup task to delete old logs */
+    if(scheduler != null) {
+      if(scheduler.hasShutdown) {
+        println("Restarting log cleaner scheduler")
+        scheduler.startUp
+      }
+      info("starting log cleaner every " + logCleanupIntervalMs + " ms")
+      scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
+    }
+
+    if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp
     info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
     logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
+    startupLatch.countDown
   }
 
   private def awaitStartup() {
     startupLatch.await
   }
 
-  private def registerNewTopicInZK(topic: String) {
-    zkActor ! topic
-  }
-
   /**
    * Create a log for the given topic and the given partition
    */
@@ -186,6 +162,10 @@ private[kafka] class LogManager(val conf
    * Create the log if it does not exist, if it exists just return it
    */
   def getOrCreateLog(topic: String, partition: Int): Log = {
+    // TODO: Change this later
+    if(!ZkUtils.isPartitionOnBroker(kafkaZookeeper.zkClient, topic, partition, config.brokerId))
+      throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s".
+        format(config.brokerId, partition, topic))
     var hasNewTopic = false
     var parts = getLogPool(topic, partition)
     if (parts == null) {
@@ -196,6 +176,7 @@ private[kafka] class LogManager(val conf
     }
     var log = parts.get(partition)
     if(log == null) {
+      // check if this broker hosts this partition
       log = createLog(topic, partition)
       val found = parts.putIfNotExists(partition, log)
       if(found != null) {
@@ -207,8 +188,6 @@ private[kafka] class LogManager(val conf
         info("Created log for '" + topic + "'-" + partition)
     }
 
-    if (hasNewTopic)
-      registerNewTopicInZK(topic)
     log
   }
   
@@ -279,11 +258,11 @@ private[kafka] class LogManager(val conf
    */
   def close() {
     info("Closing log manager")
+    scheduler.shutdown()
     logFlusherScheduler.shutdown()
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
-    zkActor ! StopActor
     kafkaZookeeper.close
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Thu Mar  1 21:15:26 2012
@@ -16,44 +16,70 @@
 */
 package kafka.producer
 
-import collection.Map
-import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
+import collection.mutable.HashMap
+import kafka.api.{TopicMetadataRequest, TopicMetadata}
+import java.lang.IllegalStateException
+import kafka.common.NoLeaderForPartitionException
+import kafka.utils.Logging
+
+class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
+  val topicPartitionInfo = new HashMap[String, TopicMetadata]()
+  val zkClient = producerPool.getZkClient
 
-trait BrokerPartitionInfo {
   /**
    * Return a sequence of (brokerId, numPartitions).
    * @param topic the topic for which this information is to be returned
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
    */  
-  def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition]
+  def getBrokerPartitionInfo(topic: String): Seq[(Partition, Broker)] = {
+    // check if the cache has metadata for this topic
+    val topicMetadata = topicPartitionInfo.get(topic)
+    val metadata: TopicMetadata =
+    topicMetadata match {
+      case Some(m) => m
+      case None =>
+        // refresh the topic metadata cache
+        info("Fetching metadata for topic %s".format(topic))
+        updateInfo(topic)
+        val topicMetadata = topicPartitionInfo.get(topic)
+        topicMetadata match {
+          case Some(m) => m
+          case None => throw new IllegalStateException("Failed to fetch topic metadata for topic: " + topic)
+        }
+    }
+    val partitionMetadata = metadata.partitionsMetadata
+    partitionMetadata.map { m =>
+      m.leader match {
+        case Some(leader) => (new Partition(leader.id, m.partitionId, topic) -> leader)
+        case None =>  throw new NoLeaderForPartitionException("No leader for topic %s, partition %d".format(topic, m.partitionId))
+      }
+    }.sortWith((s, t) => s._1.partId < t._1.partId)
+  }
 
   /**
-   * Generate the host and port information for the broker identified
-   * by the given broker id 
-   * @param brokerId the broker for which the info is to be returned
-   * @return host and port of brokerId
+   * It updates the cache by issuing a get topic metadata request to a random broker.
+   * @param topic the topic for which the metadata is to be fetched
    */
-  def getBrokerInfo(brokerId: Int): Option[Broker]
-
-  /**
-   * Generate a mapping from broker id to the host and port for all brokers
-   * @return mapping from id to host and port of all brokers
-   */
-  def getAllBrokerInfo: Map[Int, Broker]
-
-  /**
-   * This is relevant to the ZKBrokerPartitionInfo. It updates the ZK cache
-   * by reading from zookeeper and recreating the data structures. This API
-   * is invoked by the producer, when it detects that the ZK cache of
-   * ZKBrokerPartitionInfo is stale.
-   *
-   */
-  def updateInfo
-
-  /**
-   * Cleanup
-   */
-  def close
+  def updateInfo(topic: String = null) = {
+    val producer = producerPool.getAnyProducer
+    if(topic != null) {
+      val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+      val topicMetadataList = producer.send(topicMetadataRequest)
+      val topicMetadata:Option[TopicMetadata] = if(topicMetadataList.size > 0) Some(topicMetadataList.head) else None
+      topicMetadata match {
+        case Some(metadata) =>
+          info("Fetched metadata for topics %s".format(topic))
+          topicPartitionInfo += (topic -> metadata)
+        case None =>
+      }
+    }else {
+      // refresh cache for all topics
+      val topics = topicPartitionInfo.keySet.toList
+      val topicMetadata = producer.send(new TopicMetadataRequest(topics))
+      info("Fetched metadata for topics %s".format(topicMetadata.mkString(",")))
+      topicMetadata.foreach(metadata => topicPartitionInfo += (metadata.topic -> metadata))
+    }
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Mar  1 21:15:26 2012
@@ -18,7 +18,7 @@
 package kafka.producer
 
 import async.MissingConfigException
-import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
+import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.Logging

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Thu Mar  1 21:15:26 2012
@@ -22,15 +22,14 @@ import kafka.common.InvalidConfigExcepti
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
+import org.I0Itec.zkclient.ZkClient
 
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
 extends Logging {
   private val hasShutdown = new AtomicBoolean(false)
-  if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
-    throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
-  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
-    throw new InvalidConfigException("Only one of zk.connect and broker.list should be provided")
+  if(!Utils.propertyExists(config.zkConnect))
+    throw new InvalidConfigException("zk.connect property must be specified in the producer")
   if (config.batchSize > config.queueSize)
     throw new InvalidConfigException("Batch size can't be larger than queue size.")
 
@@ -52,15 +51,16 @@ extends Logging {
    * This constructor can be used when all config parameters will be specified through the
    * ProducerConfig object
    * @param config Producer Configuration object
+   * @param zkClient The ZkClient instance use by the producer to connect to zookeeper. used ONLY for testing
    */
-  def this(config: ProducerConfig) =
+  def this(config: ProducerConfig, zkClient: ZkClient = null) =
     this(config,
          new DefaultEventHandler[K,V](config,
                                       Utils.getObject[Partitioner[K]](config.partitionerClass),
                                       Utils.getObject[Encoder[V]](config.serializerClass),
-                                      new ProducerPool(config),
-                                      populateProducerPool= true,
-                                      brokerPartitionInfo= null))
+                                      new ProducerPool(config, if(zkClient == null)
+                                      new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+                                        config.zkConnectionTimeoutMs, ZKStringSerializer) else zkClient)))
 
   /**
    * Sends the data, partitioned by key to the topic using either the

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Thu Mar  1 21:15:26 2012
@@ -29,15 +29,15 @@ class ProducerConfig(val props: Properti
    *  to pass in static broker and per-broker partition information. Format-    *
    *  brokerid1:host1:port1, brokerid2:host2:port2*/
   val brokerList = Utils.getString(props, "broker.list", null)
-  if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
-    throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
+  if(brokerList != null)
+    throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead")
 
   /** If both broker.list and zk.connect options are specified, throw an exception */
-  if(brokerList != null && zkConnect != null)
-    throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
+  if(zkConnect == null)
+    throw new InvalidConfigException("zk.connect property is required")
 
   /** the partitioner class for partitioning events amongst sub-topics */
-  val partitionerClass = Utils.getString(props, "partitioner.class", null)
+  val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
 
   /** this parameter specifies whether the messages are sent asynchronously *
    * or not. Valid values are - async for asynchronous send                 *

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Thu Mar  1 21:15:26 2012
@@ -13,19 +13,23 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.producer
 
-import java.util.Properties
 import kafka.cluster.Broker
-import kafka.utils.Logging
-import java.util.concurrent.ConcurrentHashMap
-
-class ProducerPool(private val config: ProducerConfig) extends Logging {
-  private val syncProducers = new ConcurrentHashMap[Int, SyncProducer]
+import java.util.Properties
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZkUtils, Utils, Logging}
+import collection.mutable.HashMap
+import java.lang.Object
+import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException}
+
+class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends Logging {
+  private val syncProducers = new HashMap[Int, SyncProducer]
+  private val lock = new Object()
 
-  def addProducer(broker: Broker) {
+  private def addProducer(broker: Broker) {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
@@ -42,17 +46,48 @@ class ProducerPool(private val config: P
     syncProducers.put(brokerId, syncProducer)
   }
 
+  def addProducers(config: ProducerConfig) {
+    lock.synchronized {
+      debug("Connecting to %s for creating sync producers for all brokers in the cluster".format(config.zkConnect))
+      val brokers = ZkUtils.getAllBrokersInCluster(zkClient)
+      brokers.foreach(broker => addProducer(broker))
+    }
+  }
+
   def getProducer(brokerId: Int) : SyncProducer = {
-    syncProducers.get(brokerId)
+    lock.synchronized {
+      val producer = syncProducers.get(brokerId)
+      producer match {
+        case Some(p) => p
+        case None => throw new UnavailableProducerException("Sync producer for broker id %d does not exist".format(brokerId))
+      }
+    }
+  }
+
+  def getAnyProducer: SyncProducer = {
+    lock.synchronized {
+      if(syncProducers.size == 0) {
+        // refresh the list of brokers from zookeeper
+        info("No sync producers available. Refreshing the available broker list from ZK and creating sync producers")
+        addProducers(config)
+        if(syncProducers.size == 0)
+          throw new NoBrokersForPartitionException("No brokers available")
+      }
+      syncProducers.get(Utils.random.nextInt(syncProducers.size)).get
+    }
   }
 
+  def getZkClient: ZkClient = zkClient
+
   /**
    * Closes all the producers in the pool
    */
   def close() = {
-    info("Closing all sync producers")
-    val iter = syncProducers.values.iterator
-    while(iter.hasNext)
-      iter.next.close
+    lock.synchronized {
+      info("Closing all sync producers")
+      val iter = syncProducers.values.iterator
+      while(iter.hasNext)
+        iter.next.close
+    }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Mar  1 21:15:26 2012
@@ -26,6 +26,7 @@ import kafka.api._
 import scala.math._
 import kafka.common.MessageSizeTooLargeException
 import java.nio.ByteBuffer
+import kafka.utils.Utils._
 
 object SyncProducer {
   val RequestKey: Short = 0
@@ -124,6 +125,21 @@ class SyncProducer(val config: SyncProdu
     send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
   }
 
+  def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
+    lock synchronized {
+      getOrMakeConnection()
+      var response: Tuple2[Receive,Int] = null
+      try {
+        sendRequest(request, channel)
+        response = getResponse(channel)
+      } catch {
+        case e : java.io.IOException => error("Failed to write topic metadata request on the socket channel", e)
+      }
+      // TODO: handle any errors in the response and throw the relevant exception
+      TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer)
+    }
+  }
+
   def close() = {
     lock synchronized {
       disconnect()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Thu Mar  1 21:15:26 2012
@@ -19,56 +19,26 @@ package kafka.producer.async
 
 import kafka.api.ProducerRequest
 import kafka.serializer.Encoder
-import java.util.Properties
 import kafka.producer._
-import kafka.utils.{ZKConfig, Utils, Logging}
 import kafka.cluster.{Partition, Broker}
 import collection.mutable.{ListBuffer, HashMap}
 import scala.collection.Map
 import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException}
 import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
+import kafka.utils.{Utils, Logging}
 
 class DefaultEventHandler[K,V](config: ProducerConfig,                               // this api is for testing
                                private val partitioner: Partitioner[K],              // use the other constructor
                                private val encoder: Encoder[V],
-                               private val producerPool: ProducerPool,
-                               private val populateProducerPool: Boolean,
-                               private var brokerPartitionInfo: BrokerPartitionInfo)
+                               private val producerPool: ProducerPool)
   extends EventHandler[K,V] with Logging {
 
-  private val lock = new Object()
-  private val zkEnabled = Utils.propertyExists(config.zkConnect)
-  if(brokerPartitionInfo == null) {
-    zkEnabled match {
-      case true =>
-        val zkProps = new Properties()
-        zkProps.put("zk.connect", config.zkConnect)
-        zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
-        zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
-        zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
-        brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
-      case false =>
-        brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
-    }
-  }
+  val brokerPartitionInfo = new BrokerPartitionInfo(producerPool)
 
-  // pool of producers, one per broker
-  if(populateProducerPool) {
-    val allBrokers = brokerPartitionInfo.getAllBrokerInfo
-    allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
-  }
+  // add producers to the producer pool
+  producerPool.addProducers(config)
 
-  /**
-   * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
-   * on registration of new broker in zookeeper
-   * @param bid the id of the broker
-   * @param host the hostname of the broker
-   * @param port the port of the broker
-   */
-  private def producerCbk(bid: Int, host: String, port: Int) =  {
-    if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
-    else debug("Skipping the callback since populateProducerPool = false")
-  }
+  private val lock = new Object()
 
   def handle(events: Seq[ProducerData[K,V]]) {
     lock synchronized {
@@ -81,7 +51,7 @@ class DefaultEventHandler[K,V](config: P
       val partitionedData = partitionAndCollate(messages)
       for ( (brokerid, eventsPerBrokerMap) <- partitionedData) {
         if (logger.isTraceEnabled)
-          eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partition: %d"
+          eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
             .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
         val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
 
@@ -98,7 +68,7 @@ class DefaultEventHandler[K,V](config: P
               numRetries +=1
               Thread.sleep(config.producerRetryBackoffMs)
               try {
-                brokerPartitionInfo.updateInfo
+                brokerPartitionInfo.updateInfo()
                 handleSerializedData(eventsPerBroker, 0)
                 return
               }
@@ -125,15 +95,15 @@ class DefaultEventHandler[K,V](config: P
       val brokerPartition = topicPartitionsList(partitionIndex)
 
       var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
-      ret.get(brokerPartition.brokerId) match {
+      ret.get(brokerPartition._2.id) match {
         case Some(element) =>
           dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
         case None =>
           dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
-          ret.put(brokerPartition.brokerId, dataPerBroker)
+          ret.put(brokerPartition._2.id, dataPerBroker)
       }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partId)
+      val topicAndPartition = (event.getTopic, brokerPartition._1.partId)
       var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
       dataPerBroker.get(topicAndPartition) match {
         case Some(element) =>
@@ -147,9 +117,9 @@ class DefaultEventHandler[K,V](config: P
     ret
   }
 
-  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
+  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[(Partition, Broker)] = {
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
-    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
     debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
@@ -168,7 +138,7 @@ class DefaultEventHandler[K,V](config: P
       throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
               "\n Valid values are > 0")
     val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
-                    else partitioner.partition(key , numPartitions)
+                    else partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new InvalidPartitionException("Invalid partition id : " + partition +
               "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
@@ -235,7 +205,5 @@ class DefaultEventHandler[K,V](config: P
   def close() {
     if (producerPool != null)
       producerPool.close    
-    if (brokerPartitionInfo != null)
-      brokerPartitionInfo.close
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Mar  1 21:15:26 2012
@@ -173,6 +173,7 @@ class KafkaApis(val logManager: LogManag
           }
       }
     }
+    info("Sending response for topic metadata request")
     Some(new TopicMetadataSend(topicsMetadata))
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Mar  1 21:15:26 2012
@@ -20,7 +20,7 @@ package kafka.server
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.io.File
-import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
+import kafka.utils.{Mx4jLoader, Utils, SystemTime, Logging}
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 
@@ -36,7 +36,6 @@ class KafkaServer(val config: KafkaConfi
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
-  val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
   private var logManager: LogManager = null
 
   /**
@@ -52,7 +51,6 @@ class KafkaServer(val config: KafkaConfi
       cleanShutDownFile.delete
     }
     logManager = new LogManager(config,
-                                scheduler,
                                 SystemTime,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
@@ -85,7 +83,6 @@ class KafkaServer(val config: KafkaConfi
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server")
-      scheduler.shutdown()
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Thu Mar  1 21:15:26 2012
@@ -51,21 +51,6 @@ class KafkaZooKeeper(config: KafkaConfig
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
 
-  def registerTopicInZk(topic: String) {
-    registerTopicInZkInternal(topic)
-    lock synchronized {
-      topics ::= topic
-    }
-  }
-
-  def registerTopicInZkInternal(topic: String) {
-    val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + config.brokerId
-    val numParts = logManager.getTopicPartitionsMap.getOrElse(topic, config.numPartitions)
-    info("Begin registering broker topic " + brokerTopicPath + " with " + numParts.toString + " partitions")
-    ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerTopicPath, numParts.toString)
-    info("End registering broker topic " + brokerTopicPath)
-  }
-
   /**
    *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
    *  connection for us. We need to re-register this broker in the broker registry.
@@ -87,11 +72,6 @@ class KafkaZooKeeper(config: KafkaConfig
     def handleNewSession() {
       info("re-registering broker info in ZK for broker " + config.brokerId)
       registerBrokerInZk()
-      lock synchronized {
-        info("re-registering broker topics in ZK for broker " + config.brokerId)
-        for (topic <- topics)
-          registerTopicInZkInternal(topic)
-      }
       info("done re-registering broker")
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Thu Mar  1 21:15:26 2012
@@ -19,6 +19,7 @@ package kafka.utils
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
+import java.lang.IllegalStateException
 
 /**
  * A scheduler for running jobs in the background
@@ -26,25 +27,41 @@ import java.util.concurrent.atomic._
  */
 class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
   private val threadId = new AtomicLong(0)
-  private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
-    def newThread(runnable: Runnable): Thread = {
-      val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
-      t.setDaemon(isDaemon)
-      t
-    }
-  })
-  executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
-  executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+  private var executor:ScheduledThreadPoolExecutor = null
+  startUp
 
-  def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
+  def startUp = {
+    executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
+      def newThread(runnable: Runnable): Thread = {
+        val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
+        t.setDaemon(isDaemon)
+        t
+      }
+    })
+    executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+    executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+  }
+
+  def hasShutdown: Boolean = executor.isShutdown
+
+  private def checkIfExecutorHasStarted = {
+    if(executor == null)
+      throw new IllegalStateException("Kafka scheduler has not been started")
+  }
+
+  def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
+    checkIfExecutorHasStarted
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
+  }
 
   def shutdownNow() {
+    checkIfExecutorHasStarted
     executor.shutdownNow()
     info("force shutdown scheduler " + baseThreadName)
   }
 
   def shutdown() {
+    checkIfExecutorHasStarted
     executor.shutdown()
     info("shutdown scheduler " + baseThreadName)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Thu Mar  1 21:15:26 2012
@@ -19,12 +19,11 @@ package kafka.utils
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{SimpleConsumer, ConsumerConfig}
-import kafka.cluster.Partition
 import kafka.api.OffsetRequest
 import java.lang.IllegalStateException
 
 /**
- *  A utility that updates the offset of every broker partition to the offset of latest log segment file, in ZK.
+ *  A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
  */
 object UpdateOffsetsInZK {
   val Earliest = "earliest"
@@ -46,7 +45,7 @@ object UpdateOffsetsInZK {
   private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
     val cluster = ZkUtils.getCluster(zkClient)
     val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
-    var partitions: List[String] = Nil
+    var partitions: Seq[String] = Nil
 
     partitionsPerTopicMap.get(topic) match {
       case Some(l) =>  partitions = l.sortWith((s,t) => s < t)
@@ -54,22 +53,29 @@ object UpdateOffsetsInZK {
     }
 
     var numParts = 0
-    for (partString <- partitions) {
-      val part = Partition.parse(partString)
-      val broker = cluster.getBroker(part.brokerId) match {
+    for (partition <- partitions) {
+      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition.toInt)
+
+      val broker = brokerHostingPartition match {
         case Some(b) => b
-        case None => throw new IllegalStateException("Broker " + part.brokerId + " is unavailable. Cannot issue " +
+        case None => throw new IllegalStateException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
           "getOffsetsBefore request")
       }
-      val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100 * 1024)
-      val offsets = consumer.getOffsetsBefore(topic, part.partId, offsetOption, 1)
+
+      val brokerInfos = ZkUtils.getBrokerInfoFromIds(zkClient, List(broker))
+      if(brokerInfos.size == 0)
+        throw new IllegalStateException("Broker information for broker id %d does not exist in ZK".format(broker))
+
+      val brokerInfo = brokerInfos.head
+      val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+      val offsets = consumer.getOffsetsBefore(topic, partition.toInt, offsetOption, 1)
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-      
-      println("updating partition " + part.name + " with new offset: " + offsets(0))
-      ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + part.name, offsets(0).toString)
+
+      println("updating partition " + partition + " with new offset: " + offsets(0))
+      ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString)
       numParts += 1
     }
-    println("updated the offset for " + numParts + " partitions")    
+    println("updated the offset for " + numParts + " partitions")
   }
 
   private def usage() = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Mar  1 21:15:26 2012
@@ -29,6 +29,7 @@ import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
+import kafka.network.{BoundedByteBufferReceive, Receive, BoundedByteBufferSend, Request}
 
 /**
  * Helper functions!
@@ -669,6 +670,20 @@ object Utils extends Logging {
       case _ => // swallow
     }
   }
+
+  def sendRequest(request: Request, channel: SocketChannel) = {
+    val send = new BoundedByteBufferSend(request)
+    send.writeCompletely(channel)
+  }
+
+  def getResponse(channel: SocketChannel): Tuple2[Receive,Int] = {
+    val response = new BoundedByteBufferReceive()
+    response.readCompletely(channel)
+
+    // this has the side effect of setting the initial position of buffer correctly
+    val errorCode: Int = response.buffer.getShort
+    (response, errorCode)
+  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Mar  1 21:15:26 2012
@@ -35,7 +35,7 @@ object ZkUtils extends Logging {
   }
 
   def getTopicPartitionsPath(topic: String): String ={
-    getTopicPath(topic) + "/" + "partitions"
+    getTopicPath(topic) + "/partitions"
   }
 
   def getTopicPartitionPath(topic: String, partitionId: String): String ={
@@ -62,6 +62,38 @@ object ZkUtils extends Logging {
       ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
 
+  def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
+    val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+    getBrokerInfoFromIds(zkClient, brokerIds.map(b => b.toInt))
+  }
+
+  def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
+    // TODO: When leader election is implemented, change this method to return the leader as follows
+    // until then, assume the first replica as the leader
+//    val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+    val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
+    val replicas = Utils.getCSVList(replicaListString)
+    replicas.size match {
+      case 0 => None
+      case _ => Some(replicas.head.toInt)
+    }
+  }
+
+  def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
+    val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
+    if(replicaListString == null)
+      Seq.empty[String]
+    else {
+      Utils.getCSVList(replicaListString)
+    }
+  }
+
+  def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
+    val replicas = getReplicasForPartition(zkClient, topic, partition)
+    debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
+    replicas.contains(brokerId.toString)
+  }
+
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
@@ -77,6 +109,11 @@ object ZkUtils extends Logging {
     info("Registering broker " + brokerIdPath + " succeeded with " + broker)
   }
 
+  def getConsumerPartitionOwnerPath(group: String, topic: String, partition: String): String = {
+    val topicDirs = new ZKGroupTopicDirs(group, topic)
+    topicDirs.consumerOwnerDir + "/" + partition
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -269,30 +306,17 @@ object ZkUtils extends Logging {
     cluster
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, List[String]] = {
-    val ret = new mutable.HashMap[String, List[String]]()
-    for (topic <- topics) {
-      var partList: List[String] = Nil
-      val brokers = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath + "/" + topic)
-      for (broker <- brokers) {
-        val nParts = readData(zkClient, BrokerTopicsPath + "/" + topic + "/" + broker).toInt
-        for (part <- 0 until nParts)
-          partList ::= broker + "-" + part
-      }
-      partList = partList.sortWith((s,t) => s < t)
-      ret += (topic -> partList)
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
+    val ret = new mutable.HashMap[String, Seq[String]]()
+    topics.foreach { topic =>
+      // get the partitions that exist for topic
+      val partitions = getChildrenParentMayNotExist(zkClient, getTopicPartitionsPath(topic))
+      debug("children of /brokers/topics/%s are %s".format(topic, partitions))
+      ret += (topic -> partitions.sortWith((s,t) => s < t))
     }
     ret
   }
 
-  def setupPartition(zkClient : ZkClient, brokerId: Int, host: String, port: Int, topic: String, nParts: Int) {
-    val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    val broker = new Broker(brokerId, brokerId.toString, host, port)
-    createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
-    val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
-    createEphemeralPathExpectConflict(zkClient, brokerPartTopicPath, nParts.toString)    
-  }
-
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
     zkClient.delete(brokerIdPath)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Thu Mar  1 21:15:26 2012
@@ -169,7 +169,8 @@ class AdminTest extends JUnit3Suite with
       case Some(metadata) => assertEquals(topic, metadata.topic)
         assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
         assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
-        assertNull("leader should not be assigned for now", metadata.partitionsMetadata.head.leader.getOrElse(null))
+        assertEquals("leader of partition 0 should be 0", 0, metadata.partitionsMetadata.head.leader.get.id)
+        assertEquals("leader of partition 1 should be 1", 1, metadata.partitionsMetadata.last.leader.get.id)
         val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
         val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
         assertEquals(expectedReplicaAssignment.toList, actualReplicaList)



Mime
View raw message