kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [2/2] git commit: KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanism for storing per-topic configurations in zookeeper and dynamically making config changes across the cluster. Reviewed by Neha and Jun.
Date Fri, 08 Mar 2013 23:09:38 GMT
Updated Branches:
  refs/heads/trunk 4f2742d60 -> c1ed12e44


KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanism for storing per-topic configurations in zookeeper and dynamically making config changes across the cluster. Reviewed by Neha and Jun.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1ed12e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1ed12e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1ed12e4

Branch: refs/heads/trunk
Commit: c1ed12e44ddebe41dc464683e3d7eeb4e6d39a45
Parents: 4f2742d
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Fri Mar 8 15:07:39 2013 -0800
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Fri Mar 8 15:07:39 2013 -0800

----------------------------------------------------------------------
 bin/kafka-create-topic.sh                          |   19 --
 bin/kafka-delete-topic.sh                          |   19 --
 bin/kafka-list-topic.sh                            |   19 --
 bin/kafka-topics.sh                                |   19 ++
 .../kafka/admin/AdminOperationException.scala      |   23 ++
 core/src/main/scala/kafka/admin/AdminUtils.scala   |  137 +++++++++--
 .../scala/kafka/admin/CreateTopicCommand.scala     |  117 ---------
 .../PreferredReplicaLeaderElectionCommand.scala    |    6 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala |  185 +++++++++++++++
 core/src/main/scala/kafka/cluster/Partition.scala  |    5 +-
 .../scala/kafka/common/KafkaZookeperClient.scala   |   35 ---
 core/src/main/scala/kafka/log/Log.scala            |    2 +-
 core/src/main/scala/kafka/log/LogConfig.scala      |   97 ++++++++-
 core/src/main/scala/kafka/log/LogManager.scala     |   31 +--
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   43 ++--
 .../main/scala/kafka/server/KafkaHealthcheck.scala |   86 +++++++
 .../scala/kafka/server/KafkaRequestHandler.scala   |    2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   77 +++---
 .../main/scala/kafka/server/KafkaZooKeeper.scala   |   90 -------
 .../main/scala/kafka/server/ReplicaManager.scala   |    2 +-
 .../scala/kafka/server/TopicConfigManager.scala    |  133 +++++++++++
 .../main/scala/kafka/utils/CommandLineUtils.scala  |    2 +-
 core/src/main/scala/kafka/utils/Json.scala         |   31 +++
 core/src/main/scala/kafka/utils/Utils.scala        |   19 ++
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   25 ++-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |  158 ++++++------
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    5 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    4 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |    3 +-
 .../scala/unit/kafka/integration/FetcherTest.scala |    4 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |   27 +--
 .../unit/kafka/integration/TopicMetadataTest.scala |    8 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   12 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |   23 +--
 .../unit/kafka/producer/SyncProducerTest.scala     |    8 +-
 .../server/HighwatermarkPersistenceTest.scala      |    6 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |    6 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   16 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   23 +-
 .../scala/unit/kafka/server/OffsetCommitTest.scala |    1 -
 .../scala/unit/kafka/server/ReplicaFetchTest.scala |   10 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |    4 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    4 +-
 .../src/test/scala/unit/kafka/utils/JsonTest.scala |   27 ++
 .../scala/unit/kafka/utils/SchedulerTest.scala     |    8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   13 +-
 47 files changed, 1003 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/bin/kafka-create-topic.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-create-topic.sh b/bin/kafka-create-topic.sh
deleted file mode 100755
index fccda7b..0000000
--- a/bin/kafka-create-topic.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
-$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/bin/kafka-delete-topic.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-delete-topic.sh b/bin/kafka-delete-topic.sh
deleted file mode 100755
index f266ae3..0000000
--- a/bin/kafka-delete-topic.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
-$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/bin/kafka-list-topic.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-list-topic.sh b/bin/kafka-list-topic.sh
deleted file mode 100755
index 1235ad0..0000000
--- a/bin/kafka-list-topic.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
-$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/bin/kafka-topics.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-topics.sh b/bin/kafka-topics.sh
new file mode 100755
index 0000000..b3195ee
--- /dev/null
+++ b/bin/kafka-topics.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.TopicCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/admin/AdminOperationException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminOperationException.scala b/core/src/main/scala/kafka/admin/AdminOperationException.scala
new file mode 100644
index 0000000..a45b3f7
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AdminOperationException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) {
+  def this(error: Throwable) = this(error.getMessage, error)
+  def this(msg: String) = this(msg, null)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b9ef4dc..6479385 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,9 +18,13 @@
 package kafka.admin
 
 import java.util.Random
+import java.util.Properties
 import kafka.api.{TopicMetadata, PartitionMetadata}
 import kafka.cluster.Broker
 import kafka.utils.{Logging, ZkUtils}
+import kafka.log.LogConfig
+import kafka.server.TopicConfigManager
+import kafka.utils.{Logging, Utils, ZkUtils, Json}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
@@ -30,7 +34,7 @@ import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
-  val AdminEpoch = -1
+  val TopicConfigChangeZnodePrefix = "config_change_"
 
   /**
    * There are 2 goals of replica assignment:
@@ -50,33 +54,74 @@ object AdminUtils extends Logging {
    * p3        p4        p0        p1        p2       (3nd replica)
    * p7        p8        p9        p5        p6       (3nd replica)
    */
-  def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
+  def assignReplicasToBrokers(brokers: Seq[Int], 
+                              partitions: Int, 
+                              replicationFactor: Int,
                               fixedStartIndex: Int = -1)  // for testing only
   : Map[Int, Seq[Int]] = {
-    if (nPartitions <= 0)
-      throw new AdministrationException("number of partitions must be larger than 0")
+    if (partitions <= 0)
+      throw new AdminOperationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
-      throw new AdministrationException("replication factor must be larger than 0")
-    if (replicationFactor > brokerList.size)
-      throw new AdministrationException("replication factor: " + replicationFactor +
-        " larger than available brokers: " + brokerList.size)
+      throw new AdminOperationException("replication factor must be larger than 0")
+    if (replicationFactor > brokers.size)
+      throw new AdminOperationException("replication factor: " + replicationFactor +
+        " larger than available brokers: " + brokers.size)
     val ret = new mutable.HashMap[Int, List[Int]]()
-    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
 
-    var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
-    for (i <- 0 until nPartitions) {
-      if (i > 0 && (i % brokerList.size == 0))
+    var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
+    for (i <- 0 until partitions) {
+      if (i > 0 && (i % brokers.size == 0))
         secondReplicaShift += 1
-      val firstReplicaIndex = (i + startIndex) % brokerList.size
-      var replicaList = List(brokerList(firstReplicaIndex))
+      val firstReplicaIndex = (i + startIndex) % brokers.size
+      var replicaList = List(brokers(firstReplicaIndex))
       for (j <- 0 until replicationFactor - 1)
-        replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
+        replicaList ::= brokers(replicaIndex(firstReplicaIndex, secondReplicaShift, j, brokers.size))
       ret.put(i, replicaList.reverse)
     }
     ret.toMap
   }
+  
+  def deleteTopic(zkClient: ZkClient, topic: String) {
+    zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
+    zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+  }
+  
+  def topicExists(zkClient: ZkClient, topic: String): Boolean = 
+    zkClient.exists(ZkUtils.getTopicPath(topic))
+    
+  def createTopic(zkClient: ZkClient,
+                  topic: String,
+                  partitions: Int, 
+                  replicationFactor: Int, 
+                  topicConfig: Properties = new Properties) {
+    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
+    AdminUtils.createTopicWithAssignment(zkClient, topic, replicaAssignment, topicConfig)
+  }
+                  
+  def createTopicWithAssignment(zkClient: ZkClient, 
+                                topic: String, 
+                                partitionReplicaAssignment: Map[Int, Seq[Int]], 
+                                config: Properties = new Properties) {
+    // validate arguments
+    Topic.validate(topic)
+    LogConfig.validate(config)
+    require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
 
-  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) {
+    val topicPath = ZkUtils.getTopicPath(topic)
+    if(zkClient.exists(topicPath))
+      throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
+    partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: "  + partitionReplicaAssignment))
+    
+    // write out the config if there is any, this isn't transactional with the partition assignments
+    writeTopicConfig(zkClient, topic, config)
+    
+    // create the partition assignment
+    writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment)
+  }
+  
+  private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]]) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
       val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
@@ -84,9 +129,61 @@ object AdminUtils extends Logging {
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
       case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
-      case e2 => throw new AdministrationException(e2.toString)
+      case e2 => throw new AdminOperationException(e2.toString)
+    }
+  }
+  
+  /**
+   * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+   */
+  def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
+    LogConfig.validate(config)
+    if(!topicExists(zkClient, topic))
+      throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+    
+    // write the new config--may not exist if there were previously no overrides
+    writeTopicConfig(zkClient, topic, config)
+    
+    // create the change notification
+    zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
+  }
+  
+  /**
+   * Write out the topic config to zk, if there is any
+   */
+  private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
+    if(config.size > 0) {
+      val map = Map("version" -> 1, "config" -> JavaConversions.asMap(config))
+      ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
     }
   }
+  
+  /**
+   * Read the topic config (if any) from zk
+   */
+  def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
+    val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
+    val props = new Properties()
+    if(str != null) {
+      Json.parseFull(str) match {
+        case None => // there are no config overrides
+        case Some(map: Map[String, _]) => 
+          require(map("version") == 1)
+          map.get("config") match {
+            case Some(config: Map[String, String]) =>
+              for((k,v) <- config)
+                props.setProperty(k, v)
+            case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
+          }
+
+        case o => throw new IllegalArgumentException("Unexpected value in config: "  + str)
+      }
+    }
+    props
+  }
+
+  def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
+    ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
 
   def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
@@ -158,12 +255,8 @@ object AdminUtils extends Logging {
     }
   }
 
-  private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
+  private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
     (firstReplicaIndex + shift) % nBrokers
   }
 }
-
-class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) {
-  def this() = this(null)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
deleted file mode 100644
index e762115..0000000
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import scala.collection.mutable
-import kafka.common.Topic
-
-object CreateTopicCommand extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.")
-                         .withRequiredArg
-                         .describedAs("topic")
-                         .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-                                      "Multiple URLS can be given to allow fail-over.")
-                           .withRequiredArg
-                           .describedAs("urls")
-                           .ofType(classOf[String])
-    val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic")
-                           .withRequiredArg
-                           .describedAs("# of partitions")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic")
-                           .withRequiredArg
-                           .describedAs("replication factor")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
-                           .withRequiredArg
-                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
-                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
-                           .ofType(classOf[String])
-                           .defaultsTo("")
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, zkConnectOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    val nPartitions = options.valueOf(nPartitionsOpt).intValue
-    val replicationFactor = options.valueOf(replicationFactorOpt).intValue
-    val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
-      println("creation succeeded!")
-    } catch {
-      case e =>
-        println("creation failed because of " + e.getMessage)
-        println(Utils.stackTrace(e))
-    } finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
-    Topic.validate(topic)
-
-    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-
-    val partitionReplicaAssignment = if (replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
-    else
-      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
-    debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
-  }
-
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = {
-    val partitionList = replicaAssignmentList.split(",")
-    val ret = new mutable.HashMap[Int, List[Int]]()
-    for (i <- 0 until partitionList.size) {
-      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      if (brokerList.size <= 0)
-        throw new AdministrationException("replication factor must be larger than 0")
-      if (brokerList.size != brokerList.toSet.size)
-        throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
-      if (!brokerList.toSet.subsetOf(availableBrokerList))
-        throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
-                "available broker:" + availableBrokerList.toString)
-      ret.put(i, brokerList.toList)
-      if (ret(i).size != ret(0).size)
-        throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
-    }
-    ret.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index ebcf669..49342c6 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -85,7 +85,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
           val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
           TopicAndPartition(topic, partition)
         }
-      case None => throw new AdministrationException("Preferred replica election data is empty")
+      case None => throw new AdminOperationException("Preferred replica election data is empty")
     }
   }
 
@@ -102,9 +102,9 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       case nee: ZkNodeExistsException =>
         val partitionsUndergoingPreferredReplicaElection =
           PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
-        throw new AdministrationException("Preferred replica leader election currently in progress for " +
+        throw new AdminOperationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
-      case e2 => throw new AdministrationException(e2.toString)
+      case e2 => throw new AdminOperationException(e2.toString)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
new file mode 100644
index 0000000..d364608
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple._
+import java.util.Properties
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import scala.collection._
+import scala.collection.JavaConversions._
+import kafka.common.Topic
+import kafka.cluster.Broker
+
+object TopicCommand {
+
+  def main(args: Array[String]): Unit = {
+    
+    val opts = new TopicCommandOptions(args)
+    
+    // should have exactly one action
+    val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
+    if(actions != 1) {
+      System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
+      opts.parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+      
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
+    
+    val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+    
+    if(opts.options.has(opts.createOpt))
+      createTopic(zkClient, opts)
+    else if(opts.options.has(opts.alterOpt))
+      alterTopic(zkClient, opts)
+    else if(opts.options.has(opts.deleteOpt))
+      deleteTopic(zkClient, opts)
+    else if(opts.options.has(opts.listOpt))
+      listTopics(zkClient)
+    else if(opts.options.has(opts.describeOpt))
+      describeTopic(zkClient, opts)
+
+    zkClient.close()
+  }
+
+  def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+    val topics = opts.options.valuesOf(opts.topicOpt)
+    val configs = parseTopicConfigs(opts)
+    for (topic <- topics) {
+      if (opts.options.has(opts.replicaAssignmentOpt)) {
+        val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+        AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs)
+      } else {
+        CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
+        val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
+        val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+        AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
+      }
+      println("Created topic \"%s\".".format(topic))
+    }
+  }
+  
+  def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+    val topics = opts.options.valuesOf(opts.topicOpt)
+    val configs = parseTopicConfigs(opts)
+    if(opts.options.has(opts.partitionsOpt))
+      Utils.croak("Changing the number of partitions is not supported.")
+    if(opts.options.has(opts.replicationFactorOpt))
+      Utils.croak("Changing the replication factor is not supported.")
+    for(topic <- topics) {
+      AdminUtils.changeTopicConfig(zkClient, topic, configs)
+      println("Updated config for topic \"%s\".".format(topic))
+    }
+  }
+  
+  def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+    for(topic <- opts.options.valuesOf(opts.topicOpt)) {
+      AdminUtils.deleteTopic(zkClient, topic)
+      println("Topic \"%s\" deleted.".format(topic))
+    }
+  }
+  
+  def listTopics(zkClient: ZkClient) {
+    for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
+      println(topic)
+  }
+  
+  def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+    val topics = opts.options.valuesOf(opts.topicOpt)
+    val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient)
+    for(md <- metadata) {
+      println(md.topic)
+      val config = AdminUtils.fetchTopicConfig(zkClient, md.topic)
+      println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
+      println("\tpartitions: " + md.partitionsMetadata.size)
+      for(pd <- md.partitionsMetadata) {
+        println("\t\tpartition " + pd.partitionId)
+        println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none"))
+        println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", "))
+        println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", "))
+      }
+    }
+  }
+  
+  def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
+  
+  def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
+    val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
+    require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
+    val props = new Properties
+    configs.foreach(pair => props.setProperty(pair(0), pair(1)))
+    props
+  }
+  
+  def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
+    val partitionList = replicaAssignmentList.split(",")
+    val ret = new mutable.HashMap[Int, List[Int]]()
+    for (i <- 0 until partitionList.size) {
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+      ret.put(i, brokerList.toList)
+      if (ret(i).size != ret(0).size)
+        throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
+    }
+    ret.toMap
+  }
+  
+  class TopicCommandOptions(args: Array[String]) {
+    val parser = new OptionParser
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                           .withRequiredArg
+                           .describedAs("urls")
+                           .ofType(classOf[String])
+    val listOpt = parser.accepts("list", "List all available topics.")
+    val createOpt = parser.accepts("create", "Create a new topic.")
+    val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
+    val deleteOpt = parser.accepts("delete", "Delete the topic.")
+    val describeOpt = parser.accepts("describe", "List details for the given topics.")
+    val helpOpt = parser.accepts("help", "Print usage information.")
+    val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
+                         .withRequiredArg
+                         .describedAs("topic")
+                         .ofType(classOf[String])
+    val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
+                          .withRequiredArg
+                          .describedAs("name=value")
+                          .ofType(classOf[String])
+    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.")
+                           .withRequiredArg
+                           .describedAs("# of partitions")
+                           .ofType(classOf[java.lang.Integer])
+    val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
+                           .withRequiredArg
+                           .describedAs("replication factor")
+                           .ofType(classOf[java.lang.Integer])
+    val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
+                           .withRequiredArg
+                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
+                           .ofType(classOf[String])
+    
+
+    val options = parser.parse(args : _*)
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index a24094a..367ccd5 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -17,9 +17,11 @@
 package kafka.cluster
 
 import scala.collection._
+import kafka.admin.AdminUtils
 import kafka.utils._
 import java.lang.Object
 import kafka.api.LeaderAndIsr
+import kafka.log.LogConfig
 import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
@@ -74,7 +76,8 @@ class Partition(val topic: String,
       case Some(replica) => replica
       case None =>
         if (isReplicaLocal(replicaId)) {
-          val log = logManager.getOrCreateLog(topic, partitionId)
+          val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
+          val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) 
           val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
           val localReplica = new Replica(replicaId, this, time, offset, Some(log))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
deleted file mode 100644
index bace1d2..0000000
--- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.common
-
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, ZKConfig}
-import java.util.concurrent.atomic.AtomicReference
-
-object KafkaZookeeperClient {
-  private val INSTANCE = new AtomicReference[ZkClient](null)
-
-  def getZookeeperClient(config: ZKConfig): ZkClient = {
-    // TODO: This cannot be a singleton since unit tests break if we do that
-//    INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-//                                              ZKStringSerializer))
-    INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-                                              ZKStringSerializer))
-    INSTANCE.get()
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0cc03bb..631953f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -49,7 +49,7 @@ import com.yammer.metrics.core.Gauge
  */
 @threadsafe
 class Log(val dir: File,
-          val config: LogConfig,
+          @volatile var config: LogConfig,
           val needsRecovery: Boolean,
           val scheduler: Scheduler,
           time: Time = SystemTime) extends Logging with KafkaMetricsGroup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 5a10bef..dc42a74 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.File
+import java.util.Properties
 import scala.collection._
 import kafka.common._
 
@@ -46,6 +46,99 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
                      val indexInterval: Int = 4096,
                      val fileDeleteDelayMs: Long = 60*1000,
                      val minCleanableRatio: Double = 0.5,
-                     val dedupe: Boolean = false)
+                     val dedupe: Boolean = false) {
+  
+  def toProps: Properties = {
+    val props = new Properties()
+    import LogConfig._
+    props.put(SegmentBytesProp, segmentSize.toString)
+    props.put(SegmentMsProp, segmentMs.toString)
+    props.put(SegmentIndexBytesProp, maxIndexSize.toString)
+    props.put(FlushMessagesProp, flushInterval.toString)
+    props.put(FlushMsProp, flushMs.toString)
+    props.put(RetentionBytesProp, retentionSize.toString)
+    props.put(RententionMsProp, retentionMs.toString)
+    props.put(MaxMessageBytesProp, maxMessageSize.toString)
+    props.put(IndexIntervalBytesProp, indexInterval.toString)
+    props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
+    props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
+    props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
+    props
+  }
+  
+}
+
+object LogConfig {
+  val SegmentBytesProp = "segment.bytes"
+  val SegmentMsProp = "segment.ms"
+  val SegmentIndexBytesProp = "segment.index.bytes"
+  val FlushMessagesProp = "flush.messages"
+  val FlushMsProp = "flush.ms"
+  val RetentionBytesProp = "retention.bytes"
+  val RententionMsProp = "retention.ms"
+  val MaxMessageBytesProp = "max.message.bytes"
+  val IndexIntervalBytesProp = "index.interval.bytes"
+  val FileDeleteDelayMsProp = "file.delete.delay.ms"
+  val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
+  val CleanupPolicyProp = "cleanup.policy"
+  
+  val ConfigNames = Set(SegmentBytesProp, 
+                        SegmentMsProp, 
+                        SegmentIndexBytesProp, 
+                        FlushMessagesProp, 
+                        FlushMsProp, 
+                        RetentionBytesProp, 
+                        RententionMsProp,
+                        MaxMessageBytesProp,
+                        IndexIntervalBytesProp,
+                        FileDeleteDelayMsProp,
+                        MinCleanableDirtyRatioProp,
+                        CleanupPolicyProp)
+    
+  
+  /**
+   * Parse the given properties instance into a LogConfig object
+   */
+  def fromProps(props: Properties): LogConfig = {
+    new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt,
+                  segmentMs = props.getProperty(SegmentMsProp).toLong,
+                  maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt,
+                  flushInterval = props.getProperty(FlushMessagesProp).toLong,
+                  flushMs = props.getProperty(FlushMsProp).toLong,
+                  retentionSize = props.getProperty(RetentionBytesProp).toLong,
+                  retentionMs = props.getProperty(RententionMsProp).toLong,
+                  maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
+                  indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
+                  fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
+                  minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
+                  dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
+  }
+  
+  /**
+   * Create a log config instance using the given properties and defaults
+   */
+  def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
+    val props = new Properties(defaults)
+    props.putAll(overrides)
+    fromProps(props)
+  }
+  
+  /**
+   * Check that property names are valid
+   */
+  private def validateNames(props: Properties) {
+    for(name <- JavaConversions.asMap(props).keys)
+      require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
+  }
+  
+  /**
+   * Check that the given properties contain only valid log config names, and that all values can be parsed.
+   */
+  def validate(props: Properties) {
+    validateNames(props)
+    LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
+  }
+  
+}
                       
                      
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 438d802..0d567e4 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -174,9 +174,8 @@ class LogManager(val logDirs: Array[File],
   /**
    * Get the log if it exists, otherwise return None
    */
-  def getLog(topic: String, partition: Int): Option[Log] = {
-    val topicAndPartiton = TopicAndPartition(topic, partition)
-    val log = logs.get(topicAndPartiton)
+  def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
+    val log = logs.get(topicAndPartition)
     if (log == null)
       None
     else
@@ -184,21 +183,10 @@ class LogManager(val logDirs: Array[File],
   }
 
   /**
-   * Create the log if it does not exist, otherwise just return it
-   */
-  def getOrCreateLog(topic: String, partition: Int): Log = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    logs.get(topicAndPartition) match {
-      case null => createLogIfNotExists(topicAndPartition)
-      case log: Log => log
-    }
-  }
-
-  /**
    * Create a log for the given topic and the given partition
    * If the log already exists, just return a copy of the existing log
    */
-  private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = {
+  def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
     logCreationLock synchronized {
       var log = logs.get(topicAndPartition)
       
@@ -211,12 +199,16 @@ class LogManager(val logDirs: Array[File],
       val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
       dir.mkdirs()
       log = new Log(dir, 
-                    defaultConfig,
+                    config,
                     needsRecovery = false,
                     scheduler,
                     time)
-      info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
       logs.put(topicAndPartition, log)
+      info("Created log for topic %s partition %d in %s with properties {%s}."
+           .format(topicAndPartition.topic, 
+                   topicAndPartition.partition, 
+                   dataDir.getAbsolutePath,
+                   JavaConversions.asMap(config.toProps).mkString(", ")))
       log
     }
   }
@@ -289,6 +281,11 @@ class LogManager(val logDirs: Array[File],
    * Get all the partition logs
    */
   def allLogs(): Iterable[Log] = logs.values
+  
+  /**
+   * Get a map of TopicAndPartition => Log
+   */
+  def logsByTopicPartition = logs.toMap
 
   /**
    * Flush any log which has exceeded its flush interval and has unwritten messages.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 648d936..865f7b4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -281,7 +281,7 @@ private[kafka] class Processor(val id: Int,
           debug("Ignoring response for closed socket.")
           close(key)
         }
-      }finally {
+      } finally {
         curr = requestChannel.receiveResponse(id)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f30dca1..f8faf96 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.admin.{CreateTopicCommand, AdminUtils}
+import kafka.admin.AdminUtils
 import kafka.api._
 import kafka.message._
 import kafka.network._
@@ -25,6 +25,7 @@ import kafka.log._
 import kafka.utils.ZKGroupTopicDirs
 import org.apache.log4j.Logger
 import scala.collection._
+import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
@@ -367,7 +368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
   
   def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
-    logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
+    logManager.getLog(topicAndPartition) match {
       case Some(log) => 
         fetchOffsetsBefore(log, timestamp, maxNumOffsets)
       case None => 
@@ -442,7 +443,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               /* check if auto creation of topics is turned on */
               if (config.autoCreateTopicsEnable) {
                 try {
-                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+                  AdminUtils.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
                   info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                                .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
                 } catch {
@@ -478,24 +479,25 @@ class KafkaApis(val requestChannel: RequestChannel,
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString)
     trace("Handling offset commit request " + offsetCommitRequest.toString)
-    val responseInfo = offsetCommitRequest.requestInfo.map( t => {
-      val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic)
-      try {
-        if(t._2.metadata != null && t._2.metadata.length > config.offsetMetadataMaxSize) {
-          (t._1, ErrorMapping.OffsetMetadataTooLargeCode)
-        } else {
-          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
-            t._1.partition, t._2.offset.toString)
-          (t._1, ErrorMapping.NoError)
+    val responseInfo = offsetCommitRequest.requestInfo.map{
+      case (topicAndPartition, metaAndError) => {
+        val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
+        try {
+          if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
+            (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+          } else {
+            ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+              topicAndPartition.partition, metaAndError.offset.toString)
+            (topicAndPartition, ErrorMapping.NoError)
+          }
+        } catch {
+          case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
         }
-      } catch {
-        case e => 
-          (t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
       }
-    })
+    }
     val response = new OffsetCommitResponse(responseInfo, 
-                                        offsetCommitRequest.correlationId,
-                                        offsetCommitRequest.clientId)
+                                            offsetCommitRequest.correlationId,
+                                            offsetCommitRequest.clientId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -506,7 +508,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString)
-    trace("Handling offset fetch request " + offsetFetchRequest.toString)
     val responseInfo = offsetFetchRequest.requestInfo.map( t => {
       val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
       try {
@@ -525,8 +526,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     })
     val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
-                                        offsetFetchRequest.correlationId,
-                                        offsetFetchRequest.clientId)
+                                           offsetFetchRequest.correlationId,
+                                           offsetFetchRequest.clientId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
new file mode 100644
index 0000000..a078707
--- /dev/null
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils._
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import kafka.common._
+import java.net.InetAddress
+
+
+/**
+ * This class registers the broker in zookeeper to allow 
+ * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
+ *   /brokers/[0...N] --> host:port
+ *   
+ * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
+ * we are dead.
+ */
+class KafkaHealthcheck(private val brokerId: Int, 
+                       private val host: String, 
+                       private val port: Int, 
+                       private val zkClient: ZkClient) extends Logging {
+
+  val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+  
+  def startup() {
+    zkClient.subscribeStateChanges(new SessionExpireListener)
+    register()
+  }
+
+  /**
+   * Register this broker as "alive" in zookeeper
+   */
+  def register() {
+    val hostName = 
+      if(host == null || host.trim.isEmpty) 
+        InetAddress.getLocalHost.getCanonicalHostName 
+      else
+        host
+    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+    ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, jmxPort)
+  }
+
+  /**
+   *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+   *  connection for us. We need to re-register this broker in the broker registry.
+   */
+  class SessionExpireListener() extends IZkStateListener {
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      info("re-registering broker info in ZK for broker " + brokerId)
+      register()
+      info("done re-registering broker")
+      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 842dcf3..f0949c2 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -67,7 +67,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
       handler.shutdown
     for(thread <- threads)
       thread.join
-    info("shutted down completely")
+    info("shut down completely")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index badd1f8..9fa432d 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import kafka.network.SocketServer
+import kafka.admin._
 import kafka.log.LogConfig
 import kafka.log.CleanerConfig
 import kafka.log.LogManager
@@ -39,7 +40,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
   var logManager: LogManager = null
-  var kafkaZookeeper: KafkaZooKeeper = null
+  var kafkaHealthcheck: KafkaHealthcheck = null
+  var topicConfigManager: TopicConfigManager = null
   var replicaManager: ReplicaManager = null
   var apis: KafkaApis = null
   var kafkaController: KafkaController = null
@@ -57,9 +59,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
     /* start scheduler */
     kafkaScheduler.startup()
+    
+    /* setup zookeeper */
+    zkClient = initZk()
 
     /* start log manager */
-    logManager = createLogManager(config)
+    logManager = createLogManager(zkClient)
     logManager.startup()
 
     socketServer = new SocketServer(config.brokerId,
@@ -68,31 +73,40 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                     config.numNetworkThreads,
                                     config.queuedMaxRequests,
                                     config.socketRequestMaxBytes)
+    socketServer.startup()
 
-    socketServer.startup
-
-    /* start client */
-    kafkaZookeeper = new KafkaZooKeeper(config)
-    // starting relevant replicas and leader election for partitions assigned to this broker
-    kafkaZookeeper.startup
-
-    info("Connecting to ZK: " + config.zkConnect)
-
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
-
-    kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config)
+    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager)
+    kafkaController = new KafkaController(config, zkClient)
+    
+    /* start processing requests */
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
-    Mx4jLoader.maybeLoad
+   
+    Mx4jLoader.maybeLoad()
 
-    // start the replica manager
     replicaManager.startup()
-    // start the controller
+
     kafkaController.startup()
-    // register metrics beans
+    
+    topicConfigManager = new TopicConfigManager(zkClient, logManager)
+    topicConfigManager.startup()
+    
+    /* tell everyone we are alive */
+    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient)
+    kafkaHealthcheck.startup()
+
+    
     registerStats()
+    
     info("started")
   }
+  
+  private def initZk(): ZkClient = {
+    info("Connecting to zookeeper on " + config.zkConnect)
+    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    ZkUtils.setupCommonPaths(zkClient)
+    zkClient
+  }
 
   /**
    *  Forces some dynamic jmx beans to be registered on server startup.
@@ -118,15 +132,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       Utils.swallow(kafkaScheduler.shutdown())
       if(apis != null)
         Utils.swallow(apis.close())
-      if(kafkaZookeeper != null)
-        Utils.swallow(kafkaZookeeper.shutdown())
       if(replicaManager != null)
         Utils.swallow(replicaManager.shutdown())
       if(logManager != null)
         Utils.swallow(logManager.shutdown())
-
       if(kafkaController != null)
         Utils.swallow(kafkaController.shutdown())
+      if(zkClient != null)
+        Utils.swallow(zkClient.close())
 
       shutdownLatch.countDown()
       info("shut down completed")
@@ -140,13 +153,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   def getLogManager(): LogManager = logManager
   
-  private def createLogManager(config: KafkaConfig): LogManager = {
-    val topics = config.logCleanupPolicyMap.keys ++ 
-                 config.logSegmentBytesPerTopicMap.keys ++ 
-                 config.logFlushIntervalMsPerTopicMap.keys ++ 
-                 config.logRollHoursPerTopicMap.keys ++ 
-                 config.logRetentionBytesPerTopicMap.keys ++ 
-                 config.logRetentionHoursPerTopicMap.keys
+  private def createLogManager(zkClient: ZkClient): LogManager = {
     val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, 
                                      segmentMs = 60 * 60 * 1000 * config.logRollHours,
                                      flushInterval = config.logFlushIntervalMessages,
@@ -159,13 +166,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                      fileDeleteDelayMs = config.logDeleteDelayMs,
                                      minCleanableRatio = config.logCleanerMinCleanRatio,
                                      dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
-    val logConfigs = for(topic <- topics) yield 
-      topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes), 
-                                     segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours),
-                                     flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong,
-                                     retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes),
-                                     retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours),
-                                     dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe")
+    val defaultProps = defaultLogConfig.toProps
+    val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
+    // read the log configurations from zookeeper
     val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
                                       dedupeBufferSize = config.logCleanerDedupeBufferSize,
                                       ioBufferSize = config.logCleanerIoBufferSize,
@@ -174,7 +177,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                       backOffMs = config.logCleanerBackoffMs,
                                       enableCleaner = config.logCleanerEnable)
     new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
-                   topicConfigs = logConfigs.toMap,
+                   topicConfigs = configs,
                    defaultConfig = defaultLogConfig,
                    cleanerConfig = cleanerConfig,
                    flushCheckMs = config.logFlushSchedulerIntervalMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
deleted file mode 100644
index 0e6c656..0000000
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils._
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import kafka.common._
-import java.net.InetAddress
-
-
-/**
- * Handles registering broker with zookeeper in the following path:
- *   /brokers/[0...N] --> host:port
- */
-class KafkaZooKeeper(config: KafkaConfig) extends Logging {
-
-  val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
-  private var zkClient: ZkClient = null
-
-   def startup() {
-     /* start client */
-     info("connecting to ZK: " + config.zkConnect)
-     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
-     zkClient.subscribeStateChanges(new SessionExpireListener)
-     registerBrokerInZk()
-   }
-
-  private def registerBrokerInZk() {
-    val hostName = 
-      if(config.hostName == null || config.hostName.trim.isEmpty) 
-        InetAddress.getLocalHost.getCanonicalHostName 
-      else
-        config.hostName 
-    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
-  }
-
-  /**
-   *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
-   *  connection for us. We need to re-register this broker in the broker registry.
-   */
-  class SessionExpireListener() extends IZkStateListener {
-    @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
-    }
-
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception
-     *             On any error.
-     */
-    @throws(classOf[Exception])
-    def handleNewSession() {
-      info("re-registering broker info in ZK for broker " + config.brokerId)
-      registerBrokerInZk()
-      info("done re-registering broker")
-      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
-    }
-  }
-
-  def shutdown() {
-    if (zkClient != null) {
-      info("Closing zookeeper client...")
-      zkClient.close()
-    }
-  }
-
-  def getZookeeperClient = {
-    zkClient
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 573601f..765d3cb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -175,7 +175,7 @@ class ReplicaManager(val config: KafkaConfig,
           case Some(leaderReplica) => leaderReplica
           case None =>
             throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d"
-                    .format(topic, partitionId, config.brokerId))
+                                                  .format(topic, partitionId, config.brokerId))
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
new file mode 100644
index 0000000..5814cb7
--- /dev/null
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+import scala.collection._
+import kafka.log._
+import kafka.utils._
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+
+/**
+ * This class initiates and carries out topic config changes.
+ * 
+ * It works as follows.
+ * 
+ * Config is stored under the path
+ *   /brokers/topics/<topic_name>/config
+ * This znode stores the topic-overrides for this topic (but no defaults) in properties format.
+ * 
+ * To avoid watching all topics for changes instead we have a notification path
+ *   /brokers/config_changes
+ * The TopicConfigManager has a child watch on this path.
+ * 
+ * To update a topic config we first update the topic config properties. Then we create a new sequential
+ * znode under the change path which contains the name of the topic that was updated, say
+ *   /brokers/config_changes/config_change_13321
+ *   
+ * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
+ * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
+ * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. 
+ * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config 
+ * for all logs for that topic (if any) that it has.
+ * 
+ * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
+ * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
+ * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the 
+ * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
+ * but that is harmless.
+ * 
+ * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
+ * on startup where a change might be missed between the initial config load and registering for change notifications.
+ * 
+ */
+class TopicConfigManager(private val zkClient: ZkClient,
+                         private val logManager: LogManager,
+                         private val changeExpirationMs: Long = 10*60*1000,
+                         private val time: Time = SystemTime) extends Logging {
+  private var lastExecutedChange = -1L
+  
+  /**
+   * Begin watching for config changes
+   */
+  def startup() {
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
+    zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
+    processAllConfigChanges()
+  }
+  
+  /**
+   * Process all config changes
+   */
+  private def processAllConfigChanges() {
+    val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
+    processConfigChanges(JavaConversions.asBuffer(configChanges).sorted)
+  }
+
+  /**
+   * Process the given list of config changes
+   */
+  private def processConfigChanges(notifications: Seq[String]) {
+    if (notifications.size > 0) {
+      info("Processing %d topic config change notification(s)...".format(notifications.size))
+      val now = time.milliseconds
+      val logs = logManager.logsByTopicPartition.toBuffer
+      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
+      val lastChangeId = notifications.map(changeNumber).max
+      for (notification <- notifications) {
+        val changeId = changeNumber(notification)
+        if (changeId > lastExecutedChange) {
+          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
+          val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode)
+          val topic = topicJson.substring(1, topicJson.length - 1) // dequote
+          if (logsByTopic.contains(topic)) {
+            /* combine the default properties with the overrides in zk to create the new LogConfig */
+            val props = new Properties(logManager.defaultConfig.toProps)
+            props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
+            val logConfig = LogConfig.fromProps(props)
+            for (log <- logsByTopic(topic))
+              log.config = logConfig
+            lastExecutedChange = changeId
+            info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
+          } else if (now - stat.getCtime > changeExpirationMs) {
+            /* this change is now obsolete, try to delete it unless it is the last change left */
+            ZkUtils.deletePath(zkClient, changeZnode)
+          }
+        }
+      }
+    }
+  }
+    
+  /* get the change number from a change notification znode */
+  private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
+  
+  /**
+   * A listener that applies config changes to logs
+   */
+  object ConfigChangeListener extends IZkChildListener {
+    override def handleChildChange(path: String, chillins: java.util.List[String]) {
+      try {
+        processConfigChanges(JavaConversions.asBuffer(chillins))
+      } catch {
+        case e: Exception => error("Error processing config change:", e)
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 21c5d4a..29f1209 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -10,7 +10,7 @@ object CommandLineUtils extends Logging {
     def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
       for(arg <- required) {
         if(!options.has(arg)) {
-          error("Missing required argument \"" + arg + "\"")
+          System.err.println("Missing required argument \"" + arg + "\"")
           parser.printHelpOn(System.err)
           System.exit(1)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index a114769..e300f60 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -1,6 +1,7 @@
 package kafka.utils
 
 import kafka.common._
+import scala.collection._
 import util.parsing.json.JSON
 
 /**
@@ -11,6 +12,9 @@ object Json extends Logging {
   JSON.globalNumberParser = myConversionFunc
   val lock = new Object
 
+  /**
+   * Parse a JSON string into an object
+   */
   def parseFull(input: String): Option[Any] = {
     lock synchronized {
       try {
@@ -21,4 +25,31 @@ object Json extends Logging {
       }
     }
   }
+  
+  /**
+   * Encode an object into a JSON string. This method accepts any type T where
+   *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
+   * Any other type will result in an exception.
+   * 
+   * This method does not properly handle non-ascii characters. 
+   */
+  def encode(obj: Any): String = {
+    obj match {
+      case null => "null"
+      case b: Boolean => b.toString
+      case s: String => "\"" + s + "\""
+      case n: Number => n.toString
+      case m: Map[_, _] => 
+        "{" + 
+          m.map(elem => 
+            elem match {
+            case t: Tuple2[_,_] => encode(t._1) + ":" + encode(t._2)
+            case _ => throw new IllegalArgumentException("Invalid map element (" + elem + ") in " + obj)
+          }).mkString(",") + 
+      "}"
+      case a: Array[_] => encode(a.toSeq)
+      case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]"
+      case other: AnyRef => throw new IllegalArgumentException("Unknown arguement of type " + other.getClass + ": " + other)
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 37b3975..c8fdf4a 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -577,6 +577,25 @@ object Utils extends Logging {
   }
   
   /**
+   * Turn a properties map into a string
+   */
+  def asString(props: Properties): String = {
+    val writer = new StringWriter()
+    props.store(writer, "")
+    writer.toString
+  }
+  
+  /**
+   * Read some properties with the given default values
+   */
+  def readProps(s: String, defaults: Properties): Properties = {
+    val reader = new StringReader(s)
+    val props = new Properties(defaults)
+    props.load(reader)
+    props
+  }
+  
+  /**
    * Read a big-endian integer from a byte array
    */
   def readInt(bytes: Array[Byte], offset: Int): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f0aba12..c6119d9 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -38,6 +38,8 @@ object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
+  val TopicConfigPath = "/config/topics"
+  val TopicConfigChangesPath = "/config/changes"
   val ControllerPath = "/controller"
   val ControllerEpochPath = "/controller_epoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
@@ -51,6 +53,9 @@ object ZkUtils extends Logging {
     getTopicPath(topic) + "/partitions"
   }
 
+  def getTopicConfigPath(topic: String): String = 
+    TopicConfigPath + "/" + topic
+  
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
       case Some(controller) => controller.toInt
@@ -58,17 +63,14 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getTopicPartitionPath(topic: String, partitionId: Int): String ={
+  def getTopicPartitionPath(topic: String, partitionId: Int): String =
     getTopicPartitionsPath(topic) + "/" + partitionId
-  }
 
-  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={
+  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
     getTopicPartitionPath(topic, partitionId) + "/" + "state"
-  }
 
-  def getSortedBrokerList(zkClient: ZkClient): Seq[Int] ={
+  def getSortedBrokerList(zkClient: ZkClient): Seq[Int] =
     ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted
-  }
 
   def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
     val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
@@ -89,6 +91,11 @@ object ZkUtils extends Logging {
   def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
     getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
   }
+  
+  def setupCommonPaths(zkClient: ZkClient) {
+    for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath))
+      makeSurePersistentPathExists(zkClient, path)
+  }
 
   def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
   : Option[LeaderIsrAndControllerEpoch] = {
@@ -179,7 +186,7 @@ object ZkUtils extends Logging {
     debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
     replicas.contains(brokerId.toString)
   }
-
+    
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val brokerInfo =
@@ -312,10 +319,8 @@ object ZkUtils extends Logging {
           case e: ZkNodeExistsException =>
             stat = client.writeData(path, data)
             return  stat.getVersion
-          case e2 => throw e2
         }
       }
-      case e2 => throw e2
     }
   }
 
@@ -596,7 +601,7 @@ object ZkUtils extends Logging {
           case nne: ZkNoNodeException =>
             ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
-          case e2 => throw new AdministrationException(e2.toString)
+          case e2 => throw new AdminOperationException(e2.toString)
         }
     }
   }


Mime
View raw message