kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [36/36] git commit: merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a and resolve conflicts
Date Wed, 11 Sep 2013 17:04:21 GMT
merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a and resolve conflicts


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75d95d9b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75d95d9b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75d95d9b

Branch: refs/heads/trunk
Commit: 75d95d9b0770d187916bed1dd1ca7a3e82ef08b0
Parents: ff62929 da45121
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Sep 11 09:49:48 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Sep 11 09:49:48 2013 -0700

----------------------------------------------------------------------
 README.md                                       |  55 ++--
 bin/kafka-add-partitions.sh                     |  18 ++
 bin/kafka-check-reassignment-status.sh          |   3 +-
 bin/kafka-console-consumer-log4j.properties     |  21 --
 bin/kafka-console-consumer.sh                   |   5 +-
 bin/kafka-console-producer.sh                   |   4 +-
 bin/kafka-consumer-perf-test.sh                 |   1 +
 bin/kafka-preferred-replica-election.sh         |   3 +-
 bin/kafka-producer-perf-test.sh                 |   1 +
 bin/kafka-reassign-partitions.sh                |   3 +-
 bin/kafka-replay-log-producer.sh                |   4 +-
 bin/kafka-run-class.sh                          |  53 +++-
 bin/kafka-server-start.sh                       |   8 +-
 bin/kafka-server-stop.sh                        |   2 +-
 bin/kafka-simple-consumer-perf-test.sh          |   1 +
 bin/kafka-topics.sh                             |   4 +-
 bin/zookeeper-server-start.sh                   |   5 +-
 config/log4j.properties                         |   8 +-
 config/producer.properties                      |   2 +-
 config/server.properties                        |  19 +-
 config/tools-log4j.properties                   |  22 ++
 contrib/hadoop-producer/README.md               |   4 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java  |  12 +-
 .../kafka/bridge/hadoop/KafkaRecordWriter.java  |  20 +-
 .../kafka/admin/AddPartitionsCommand.scala      | 127 +++++++++
 .../src/main/scala/kafka/admin/AdminUtils.scala |  59 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala |  94 +++++--
 .../main/scala/kafka/admin/TopicCommand.scala   |   2 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  14 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |   2 -
 .../main/scala/kafka/cluster/Partition.scala    |  21 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |  17 +-
 .../kafka/consumer/PartitionTopicInfo.scala     |   2 +
 .../scala/kafka/consumer/SimpleConsumer.scala   |   2 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 273 ++++++++++---------
 .../controller/ControllerChannelManager.scala   |   7 +-
 .../kafka/controller/KafkaController.scala      | 164 ++++++-----
 .../controller/PartitionStateMachine.scala      |  32 ++-
 .../kafka/controller/ReplicaStateMachine.scala  |   9 +-
 .../scala/kafka/network/RequestChannel.scala    |  24 +-
 .../main/scala/kafka/network/SocketServer.scala |  32 ++-
 .../kafka/producer/ByteArrayPartitioner.scala   |  27 ++
 .../scala/kafka/producer/ConsoleProducer.scala  |  13 +-
 .../kafka/producer/DefaultPartitioner.scala     |   2 +-
 .../main/scala/kafka/producer/Producer.scala    |  33 ++-
 .../producer/async/DefaultEventHandler.scala    | 102 +++----
 .../kafka/server/AbstractFetcherThread.scala    |   8 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  20 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |   4 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |   5 +-
 .../kafka/server/KafkaRequestHandler.scala      |   2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../scala/kafka/server/ReplicaManager.scala     |   6 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  45 ++-
 .../scala/kafka/tools/ImportZkOffsets.scala     |   3 -
 .../main/scala/kafka/tools/MirrorMaker.scala    |  62 +++--
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  81 +++++-
 .../unit/kafka/admin/AddPartitionsTest.scala    | 251 +++++++++++++++++
 .../test/scala/unit/kafka/admin/AdminTest.scala |  22 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   2 +-
 .../unit/kafka/integration/FetcherTest.scala    |   2 +-
 .../kafka/integration/RollingBounceTest.scala   |   8 +-
 .../unit/kafka/producer/ProducerTest.scala      |   8 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  27 ++
 .../unit/kafka/server/LeaderElectionTest.scala  |   6 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   2 +-
 project/Build.scala                             |   2 +-
 .../0.7/lib/kafka-perf-0.7.0.jar                | Bin 52975 -> 55427 bytes
 .../config/migration_producer.properties        |   3 +
 .../migration_tool_test.py                      |   4 +-
 .../testcase_9001/testcase_9001_properties.json |   1 -
 .../testcase_9003/testcase_9003_properties.json |   1 -
 .../testcase_9004/testcase_9004_properties.json |   1 -
 .../testcase_9005/testcase_9005_properties.json |   2 -
 .../testcase_9006/testcase_9006_properties.json |   2 -
 .../mirror_maker_testsuite/mirror_maker_test.py |  10 +-
 .../replication_testsuite/replica_basic_test.py |  40 +--
 .../testcase_0001/testcase_0001_properties.json |   1 +
 .../testcase_0002/testcase_0002_properties.json |   1 +
 .../testcase_0003/testcase_0003_properties.json |   1 +
 .../testcase_0004/testcase_0004_properties.json |   1 +
 .../testcase_0005/testcase_0005_properties.json |   1 +
 .../testcase_0006/testcase_0006_properties.json |   1 +
 .../testcase_0007/testcase_0007_properties.json |   1 +
 .../testcase_0008/testcase_0008_properties.json |   1 +
 .../testcase_0009/testcase_0009_properties.json |   1 +
 .../testcase_0010/testcase_0010_properties.json |   1 +
 .../testcase_0011/testcase_0011_properties.json |   1 +
 .../testcase_0021/testcase_0021_properties.json |   2 +
 .../testcase_0022/testcase_0022_properties.json |   2 +
 .../testcase_0023/testcase_0023_properties.json |   2 +
 .../testcase_0111/testcase_0111_properties.json |   1 +
 .../testcase_0112/testcase_0112_properties.json |   1 +
 .../testcase_0113/testcase_0113_properties.json |   1 +
 .../testcase_0114/testcase_0114_properties.json |   1 +
 .../testcase_0115/testcase_0115_properties.json |   1 +
 .../testcase_0116/testcase_0116_properties.json |   1 +
 .../testcase_0117/testcase_0117_properties.json |   1 +
 .../testcase_0118/testcase_0118_properties.json |   1 +
 .../testcase_0121/testcase_0121_properties.json |   2 +
 .../testcase_0122/testcase_0122_properties.json |   2 +
 .../testcase_0123/testcase_0123_properties.json |   2 +
 .../testcase_0124/testcase_0124_properties.json |   2 +
 .../testcase_0125/testcase_0125_properties.json |   2 +
 .../testcase_0126/testcase_0126_properties.json |   2 +
 .../testcase_0127/testcase_0127_properties.json |   2 +
 .../testcase_0131/testcase_0131_properties.json |   2 +
 .../testcase_0132/testcase_0132_properties.json |   2 +
 .../testcase_0133/testcase_0133_properties.json |   2 +
 .../testcase_0201/testcase_0201_properties.json |   1 +
 .../testcase_0202/testcase_0202_properties.json |   1 +
 .../testcase_0203/testcase_0203_properties.json |   1 +
 .../testcase_0204/testcase_0204_properties.json |   1 +
 .../testcase_0205/testcase_0205_properties.json |   1 +
 .../testcase_0206/testcase_0206_properties.json |   1 +
 .../testcase_0207/testcase_0207_properties.json |   1 +
 .../testcase_0208/testcase_0208_properties.json |   1 +
 .../testcase_0251/testcase_0251_properties.json |   1 +
 .../testcase_0252/testcase_0252_properties.json |   1 +
 .../testcase_0253/testcase_0253_properties.json |   1 +
 .../testcase_0254/testcase_0254_properties.json |   1 +
 .../testcase_0255/testcase_0255_properties.json |   1 +
 .../testcase_0256/testcase_0256_properties.json |   1 +
 .../testcase_0257/testcase_0257_properties.json |   1 +
 .../testcase_0258/testcase_0258_properties.json |   1 +
 .../testcase_0301/testcase_0301_properties.json |   1 +
 .../testcase_0302/testcase_0302_properties.json |   1 +
 .../testcase_0303/testcase_0303_properties.json |   1 +
 .../testcase_0304/testcase_0304_properties.json |   1 +
 .../testcase_0305/testcase_0305_properties.json |   1 +
 .../testcase_0306/testcase_0306_properties.json |   1 +
 .../testcase_0307/testcase_0307_properties.json |   1 +
 .../testcase_0308/testcase_0308_properties.json |   1 +
 .../testcase_4001/testcase_4001_properties.json |   2 +
 .../testcase_4002/testcase_4002_properties.json |   2 +
 .../testcase_4003/testcase_4003_properties.json |   2 +
 .../testcase_4004/testcase_4004_properties.json |   2 +
 .../testcase_4005/testcase_4005_properties.json |   2 +
 .../testcase_4006/testcase_4006_properties.json |   2 +
 .../testcase_4007/testcase_4007_properties.json |   2 +
 .../testcase_4008/testcase_4008_properties.json |   2 +
 .../testcase_4011/testcase_4011_properties.json |   2 +
 .../testcase_4012/testcase_4012_properties.json |   2 +
 .../testcase_4013/testcase_4013_properties.json |   2 +
 .../testcase_4014/testcase_4014_properties.json |   2 +
 .../testcase_4015/testcase_4015_properties.json |   2 +
 .../testcase_4016/testcase_4016_properties.json |   2 +
 .../testcase_4017/testcase_4017_properties.json |   2 +
 .../testcase_4018/testcase_4018_properties.json |   2 +
 system_test/utils/kafka_system_test_utils.py    |  58 ++++
 151 files changed, 1545 insertions(+), 590 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/bin/kafka-topics.sh
----------------------------------------------------------------------
diff --cc bin/kafka-topics.sh
index b3195ee,0000000..cb76590
mode 100755,000000..100755
--- a/bin/kafka-topics.sh
+++ b/bin/kafka-topics.sh
@@@ -1,19 -1,0 +1,17 @@@
 +#!/bin/bash
 +# Licensed to the Apache Software Foundation (ASF) under one or more
 +# contributor license agreements.  See the NOTICE file distributed with
 +# this work for additional information regarding copyright ownership.
 +# The ASF licenses this file to You under the Apache License, Version 2.0
 +# (the "License"); you may not use this file except in compliance with
 +# the License.  You may obtain a copy of the License at
 +# 
 +#    http://www.apache.org/licenses/LICENSE-2.0
 +# 
 +# Unless required by applicable law or agreed to in writing, software
 +# distributed under the License is distributed on an "AS IS" BASIS,
 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +# See the License for the specific language governing permissions and
 +# limitations under the License.
 +
- base_dir=$(dirname $0)
- export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
- $base_dir/kafka-run-class.sh kafka.admin.TopicCommand $@
++$(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/config/log4j.properties
----------------------------------------------------------------------
diff --cc config/log4j.properties
index c611786,dcf48f5..782124d
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@@ -36,15 -36,9 +36,15 @@@ log4j.appender.requestAppender.File=log
  log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
  log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
  
 +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
 +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
 +log4j.appender.cleanerAppender.File=log-cleaner.log
 +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
 +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 +
  log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
  log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
- log4j.appender.controllerAppender.File=controller.log
+ log4j.appender.controllerAppender.File=logs/controller.log
  log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
  log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/config/server.properties
----------------------------------------------------------------------
diff --cc config/server.properties
index 7685879,977b8da..2eccc5e
--- a/config/server.properties
+++ b/config/server.properties
@@@ -56,15 -56,23 +56,20 @@@ num.partitions=
  
  ############################# Log Flush Policy #############################
  
 -# The following configurations control the flush of data to disk. This is among the most
 -# important performance knob in kafka.
 +# Messages are immediately written to the filesystem but by default we only fsync() to sync
- # the OS cache lazily. The below configuration can enforce a more aggressive application level
- # fsync policy. This will have a significant performance impact.
++# the OS cache lazily. The following configurations control the flush of data to disk. 
+ # There are a few important trade-offs here:
+ #    1. Durability: Unflushed data may be lost if you are not using replication.
+ #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+ #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+ # The settings below allow one to configure the flush policy to flush data after a period of time or
+ # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  
  # The number of messages to accept before forcing a flush of data to disk
 -log.flush.interval.messages=10000
 +#log.flush.interval.messages=10000
  
  # The maximum amount of time a message can sit in a log before we force a flush
 -log.flush.interval.ms=1000
 -
 -# Per-topic overrides for log.flush.interval.ms
 -#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
 +#log.flush.interval.ms=1000
  
  ############################# Log Retention Policy #############################
  
@@@ -99,11 -107,4 +104,5 @@@ zookeeper.connect=localhost:218
  # Timeout in ms for connecting to zookeeper
  zookeeper.connection.timeout.ms=1000000
  
- # metrics reporter properties
- kafka.metrics.polling.interval.secs=5
- kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
- kafka.csv.metrics.dir=/tmp/kafka_metrics
- # Disable csv reporting by default.
- kafka.csv.metrics.reporter.enabled=false
  
 +log.cleanup.policy=delete

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
index 0000000,5757c32..fd41661
mode 000000,100644..100644
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@@ -1,0 -1,127 +1,127 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package kafka.admin
+ 
+ import joptsimple.OptionParser
+ import kafka.utils._
+ import org.I0Itec.zkclient.ZkClient
+ import scala.collection.mutable
+ import kafka.common.TopicAndPartition
+ 
+ object AddPartitionsCommand extends Logging {
+ 
+   def main(args: Array[String]): Unit = {
+     val parser = new OptionParser
+     val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.")
+       .withRequiredArg
+       .describedAs("topic")
+       .ofType(classOf[String])
+     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+       "Multiple URLS can be given to allow fail-over.")
+       .withRequiredArg
+       .describedAs("urls")
+       .ofType(classOf[String])
+     val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic")
+       .withRequiredArg
+       .describedAs("# of partitions")
+       .ofType(classOf[java.lang.Integer])
+     val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions")
+       .withRequiredArg
+       .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
+       "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
+       .ofType(classOf[String])
+       .defaultsTo("")
+ 
+     val options = parser.parse(args : _*)
+ 
+     for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
+       if(!options.has(arg)) {
+         System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" +
+           "Missing required argument. " + " \"" + arg + "\"")
+         parser.printHelpOn(System.err)
+         System.exit(1)
+       }
+     }
+ 
+     val topic = options.valueOf(topicOpt)
+     val zkConnect = options.valueOf(zkConnectOpt)
+     val nPartitions = options.valueOf(nPartitionsOpt).intValue
+     val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+     var zkClient: ZkClient = null
+     try {
+       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+       addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+       println("adding partitions succeeded!")
+     } catch {
+       case e =>
+         println("adding partitions failed because of " + e.getMessage)
+         println(Utils.stackTrace(e))
+     } finally {
+       if (zkClient != null)
+         zkClient.close()
+     }
+   }
+ 
+   def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
+     val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+     if (existingPartitionsReplicaList.size == 0)
 -      throw new AdministrationException("The topic %s does not exist".format(topic))
++      throw new AdminOperationException("The topic %s does not exist".format(topic))
+ 
+     val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
+ 
+     // create the new partition replication list
+     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+     val newPartitionReplicaList = if (replicaAssignmentStr == "")
+       AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
+     else
+       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+ 
+     // check if manual assignment has the right replication factor
+     val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
+     if (unmatchedRepFactorList.size != 0)
 -      throw new AdministrationException("The replication factor in manual replication assignment " +
++      throw new AdminOperationException("The replication factor in manual replication assignment " +
+         " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
+ 
+     info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
+     val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
+     // add the new list
+     partitionReplicaList ++= newPartitionReplicaList
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
+   }
+ 
+   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
+     val partitionList = replicaAssignmentList.split(",")
+     val ret = new mutable.HashMap[Int, List[Int]]()
+     var partitionId = startPartitionId
+     for (i <- 0 until partitionList.size) {
+       val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+       if (brokerList.size <= 0)
 -        throw new AdministrationException("replication factor must be larger than 0")
++        throw new AdminOperationException("replication factor must be larger than 0")
+       if (brokerList.size != brokerList.toSet.size)
 -        throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
++        throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
+       if (!brokerList.toSet.subsetOf(availableBrokerList))
 -        throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
++        throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
+           "available broker:" + availableBrokerList.toString)
+       ret.put(partitionId, brokerList.toList)
+       if (ret(partitionId).size != ret(startPartitionId).size)
 -        throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
++        throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
+       partitionId = partitionId + 1
+     }
+     ret.toMap
+   }
+ }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/AdminUtils.scala
index 9ce89cd,c399bc7..83ba729
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@@ -52,74 -47,35 +52,78 @@@ object AdminUtils extends Logging 
     * p3        p4        p0        p1        p2       (3nd replica)
     * p7        p8        p9        p5        p6       (3nd replica)
     */
-   def assignReplicasToBrokers(brokers: Seq[Int], 
-                               partitions: Int, 
 -  def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
 -                              fixedStartIndex: Int = -1, startPartitionId: Int = -1)
++  def assignReplicasToBrokers(brokerList: Seq[Int],
++                              nPartitions: Int,
 +                              replicationFactor: Int,
-                               fixedStartIndex: Int = -1)  // for testing only
++                              fixedStartIndex: Int = -1,
++                              startPartitionId: Int = -1)
    : Map[Int, Seq[Int]] = {
-     if (partitions <= 0)
+     if (nPartitions <= 0)
 -      throw new AdministrationException("number of partitions must be larger than 0")
 +      throw new AdminOperationException("number of partitions must be larger than 0")
      if (replicationFactor <= 0)
 -      throw new AdministrationException("replication factor must be larger than 0")
 +      throw new AdminOperationException("replication factor must be larger than 0")
-     if (replicationFactor > brokers.size)
+     if (replicationFactor > brokerList.size)
 -      throw new AdministrationException("replication factor: " + replicationFactor +
 +      throw new AdminOperationException("replication factor: " + replicationFactor +
-         " larger than available brokers: " + brokers.size)
+         " larger than available brokers: " + brokerList.size)
      val ret = new mutable.HashMap[Int, List[Int]]()
-     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
+     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+     var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
  
-     var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
-     for (i <- 0 until partitions) {
-       if (i > 0 && (i % brokers.size == 0))
-         secondReplicaShift += 1
-       val firstReplicaIndex = (i + startIndex) % brokers.size
-       var replicaList = List(brokers(firstReplicaIndex))
+     var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+     for (i <- 0 until nPartitions) {
+       if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+         nextReplicaShift += 1
+       val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
+       var replicaList = List(brokerList(firstReplicaIndex))
        for (j <- 0 until replicationFactor - 1)
-         replicaList ::= brokers(replicaIndex(firstReplicaIndex, secondReplicaShift, j, brokers.size))
-       ret.put(i, replicaList.reverse)
 -        replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
++        replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
+       ret.put(currentPartitionId, replicaList.reverse)
+       currentPartitionId = currentPartitionId + 1
      }
      ret.toMap
    }
 +  
 +  def deleteTopic(zkClient: ZkClient, topic: String) {
 +    zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
 +    zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
 +  }
 +  
 +  def topicExists(zkClient: ZkClient, topic: String): Boolean = 
 +    zkClient.exists(ZkUtils.getTopicPath(topic))
 +    
 +  def createTopic(zkClient: ZkClient,
 +                  topic: String,
 +                  partitions: Int, 
 +                  replicationFactor: Int, 
 +                  topicConfig: Properties = new Properties) {
 +    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
 +    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
-     AdminUtils.createTopicWithAssignment(zkClient, topic, replicaAssignment, topicConfig)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
 +  }
 +                  
-   def createTopicWithAssignment(zkClient: ZkClient, 
-                                 topic: String, 
-                                 partitionReplicaAssignment: Map[Int, Seq[Int]], 
-                                 config: Properties = new Properties) {
++  def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
++                                                     topic: String,
++                                                     partitionReplicaAssignment: Map[Int, Seq[Int]],
++                                                     config: Properties = new Properties,
++                                                     update: Boolean = false) {
 +    // validate arguments
 +    Topic.validate(topic)
 +    LogConfig.validate(config)
 +    require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
  
 -  def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient, update: Boolean = false) {
 +    val topicPath = ZkUtils.getTopicPath(topic)
-     if(zkClient.exists(topicPath))
++    if(!update && zkClient.exists(topicPath))
 +      throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
 +    partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: "  + partitionReplicaAssignment))
 +    
 +    // write out the config if there is any, this isn't transactional with the partition assignments
 +    writeTopicConfig(zkClient, topic, config)
 +    
 +    // create the partition assignment
-     writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment)
++    writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
 +  }
 +  
-   private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]]) {
++  private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
      try {
        val zkPath = ZkUtils.getTopicPath(topic)
        val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/TopicCommand.scala
index e604233,0000000..06bbd37
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@@ -1,207 -1,0 +1,207 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.admin
 +
 +import joptsimple._
 +import java.util.Properties
 +import kafka.utils._
 +import org.I0Itec.zkclient.ZkClient
 +import scala.collection._
 +import scala.collection.JavaConversions._
 +import kafka.common.Topic
 +import kafka.cluster.Broker
 +
 +object TopicCommand {
 +
 +  def main(args: Array[String]): Unit = {
 +    
 +    val opts = new TopicCommandOptions(args)
 +    
 +    // should have exactly one action
 +    val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
 +    if(actions != 1) {
 +      System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
 +      opts.parser.printHelpOn(System.err)
 +      System.exit(1)
 +    }
 +      
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 +    
 +    val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
 +    
 +    if(opts.options.has(opts.createOpt))
 +      createTopic(zkClient, opts)
 +    else if(opts.options.has(opts.alterOpt))
 +      alterTopic(zkClient, opts)
 +    else if(opts.options.has(opts.deleteOpt))
 +      deleteTopic(zkClient, opts)
 +    else if(opts.options.has(opts.listOpt))
 +      listTopics(zkClient)
 +    else if(opts.options.has(opts.describeOpt))
 +      describeTopic(zkClient, opts)
 +
 +    zkClient.close()
 +  }
 +
 +  def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
 +    val topics = opts.options.valuesOf(opts.topicOpt)
 +    val configs = parseTopicConfigs(opts)
 +    for (topic <- topics) {
 +      if (opts.options.has(opts.replicaAssignmentOpt)) {
 +        val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-         AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs)
++        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
 +      } else {
 +        CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
 +        val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
 +        val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
 +        AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
 +      }
 +      println("Created topic \"%s\".".format(topic))
 +    }
 +  }
 +  
 +  def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
 +    val topics = opts.options.valuesOf(opts.topicOpt)
 +    val configs = parseTopicConfigs(opts)
 +    if(opts.options.has(opts.partitionsOpt))
 +      Utils.croak("Changing the number of partitions is not supported.")
 +    if(opts.options.has(opts.replicationFactorOpt))
 +      Utils.croak("Changing the replication factor is not supported.")
 +    for(topic <- topics) {
 +      AdminUtils.changeTopicConfig(zkClient, topic, configs)
 +      println("Updated config for topic \"%s\".".format(topic))
 +    }
 +  }
 +  
 +  def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
 +    for(topic <- opts.options.valuesOf(opts.topicOpt)) {
 +      AdminUtils.deleteTopic(zkClient, topic)
 +      println("Topic \"%s\" deleted.".format(topic))
 +    }
 +  }
 +  
 +  def listTopics(zkClient: ZkClient) {
 +    for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
 +      println(topic)
 +  }
 +  
 +  def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted
 +    if (topics.size <= 0)
 +      topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
 +    val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
 +    val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
 +    val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
 +    for (topic <- topics) {
 +      ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
 +        case Some(topicPartitionAssignment) =>
 +          val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
 +          if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) {
 +            println(topic)
 +            val config = AdminUtils.fetchTopicConfig(zkClient, topic)
 +            println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
 +            println("\tpartitions: " + sortedPartitions.size)
 +          }
 +          for ((partitionId, assignedReplicas) <- sortedPartitions) {
 +            val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
 +            val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
 +            if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
 +                (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
 +                (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
 +              print("\t\ttopic: " + topic)
 +              print("\tpartition: " + partitionId)
 +              print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
 +              print("\treplicas: " + assignedReplicas.mkString(","))
 +              println("\tisr: " + inSyncReplicas.mkString(","))
 +            }
 +          }
 +        case None =>
 +          println("topic " + topic + " doesn't exist!")
 +      }
 +    }
 +  }
 +  
 +  def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
 +  
 +  def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
 +    val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
 +    require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
 +    val props = new Properties
 +    configs.foreach(pair => props.setProperty(pair(0), pair(1)))
 +    props
 +  }
 +  
 +  def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
 +    val partitionList = replicaAssignmentList.split(",")
 +    val ret = new mutable.HashMap[Int, List[Int]]()
 +    for (i <- 0 until partitionList.size) {
 +      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
 +      ret.put(i, brokerList.toList)
 +      if (ret(i).size != ret(0).size)
 +        throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
 +    }
 +    ret.toMap
 +  }
 +  
 +  class TopicCommandOptions(args: Array[String]) {
 +    val parser = new OptionParser
 +    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
 +                                      "Multiple URLS can be given to allow fail-over.")
 +                           .withRequiredArg
 +                           .describedAs("urls")
 +                           .ofType(classOf[String])
 +    val listOpt = parser.accepts("list", "List all available topics.")
 +    val createOpt = parser.accepts("create", "Create a new topic.")
 +    val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
 +    val deleteOpt = parser.accepts("delete", "Delete the topic.")
 +    val describeOpt = parser.accepts("describe", "List details for the given topics.")
 +    val helpOpt = parser.accepts("help", "Print usage information.")
 +    val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
 +                         .withRequiredArg
 +                         .describedAs("topic")
 +                         .ofType(classOf[String])
 +    val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
 +                          .withRequiredArg
 +                          .describedAs("name=value")
 +                          .ofType(classOf[String])
 +    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.")
 +                           .withRequiredArg
 +                           .describedAs("# of partitions")
 +                           .ofType(classOf[java.lang.Integer])
 +    val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
 +                           .withRequiredArg
 +                           .describedAs("replication factor")
 +                           .ofType(classOf[java.lang.Integer])
 +    val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
 +                           .withRequiredArg
 +                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
 +                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
 +                           .ofType(classOf[String])
 +    val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
 +                                                            "if set when describing topics, only show under replicated partitions")
 +    val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
 +                                                            "if set when describing topics, only show partitions whose leader is not available")
 +
 +
 +    val options = parser.parse(args : _*)
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e3a6420,e7a692a..08c9e4f
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@@ -85,8 -85,9 +85,9 @@@ private[kafka] class ZookeeperConsumerC
    private var fetcher: Option[ConsumerFetcherManager] = None
    private var zkClient: ZkClient = null
    private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+   private var checkpointedOffsets = new Pool[TopicAndPartition, Long]
    private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
 -  private val scheduler = new KafkaScheduler(1)
 +  private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
    private val messageStreamCreated = new AtomicBoolean(false)
  
    private var sessionExpirationListener: ZKSessionExpireListener = null
@@@ -155,32 -153,34 +156,34 @@@
    }
  
    def shutdown() {
-     val canShutdown = isShuttingDown.compareAndSet(false, true);
-     if (canShutdown) {
-       info("ZKConsumerConnector shutting down")
+     rebalanceLock synchronized {
+       val canShutdown = isShuttingDown.compareAndSet(false, true);
+       if (canShutdown) {
+         info("ZKConsumerConnector shutting down")
  
-       if (wildcardTopicWatcher != null)
-         wildcardTopicWatcher.shutdown()
-       try {
-         if (config.autoCommitEnable)
-           scheduler.shutdown()
-         fetcher match {
-           case Some(f) => f.stopConnections
-           case None =>
-         }
-         sendShutdownToAllQueues()
-         if (config.autoCommitEnable)
-           commitOffsets()
-         if (zkClient != null) {
-           zkClient.close()
-           zkClient = null
+         if (wildcardTopicWatcher != null)
+           wildcardTopicWatcher.shutdown()
+         try {
+           if (config.autoCommitEnable)
 -	        scheduler.shutdownNow()
++	        scheduler.shutdown()
+           fetcher match {
+             case Some(f) => f.stopConnections
+             case None =>
+           }
+           sendShutdownToAllQueues()
+           if (config.autoCommitEnable)
+             commitOffsets()
+           if (zkClient != null) {
+             zkClient.close()
+             zkClient = null
+           }
+         } catch {
+           case e =>
+             fatal("error during consumer connector shutdown", e)
          }
-       } catch {
-         case e =>
-           fatal("error during consumer connector shutdown", e)
+         info("ZKConsumerConnector shut down completed")
        }
-       info("ZKConsumerConnector shut down completed")
 -	}
 +    }
    }
  
    def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
index 0000000,752a4fc..988e437
mode 000000,100644..100644
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@@ -1,0 -1,27 +1,27 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package kafka.producer
+ 
+ 
+ import kafka.utils._
+ 
 -private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner[Array[Byte]] {
 -  def partition(key: Array[Byte], numPartitions: Int): Int = {
 -    Utils.abs(java.util.Arrays.hashCode(key)) % numPartitions
++private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
++  def partition(key: Any, numPartitions: Int): Int = {
++    Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions
+   }
+ }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 37ddd55,9bffeb6..3afb22e
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@@ -20,10 -20,10 +20,10 @@@ package kafka.produce
  
  import kafka.utils._
  
- private class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
 -private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] {
++class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
    private val random = new java.util.Random
    
 -  def partition(key: T, numPartitions: Int): Int = {
 +  def partition(key: Any, numPartitions: Int): Int = {
      Utils.abs(key.hashCode) % numPartitions
    }
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index a078707,0000000..84ea17a
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@@ -1,86 -1,0 +1,87 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.server
 +
 +import kafka.utils._
 +import org.apache.zookeeper.Watcher.Event.KeeperState
 +import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 +import kafka.common._
 +import java.net.InetAddress
 +
 +
 +/**
 + * This class registers the broker in zookeeper to allow 
 + * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
 + *   /brokers/[0...N] --> host:port
 + *   
 + * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
 + * we are dead.
 + */
 +class KafkaHealthcheck(private val brokerId: Int, 
 +                       private val host: String, 
-                        private val port: Int, 
++                       private val port: Int,
++                       private val zkSessionTimeoutMs: Int,
 +                       private val zkClient: ZkClient) extends Logging {
 +
 +  val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
 +  
 +  def startup() {
 +    zkClient.subscribeStateChanges(new SessionExpireListener)
 +    register()
 +  }
 +
 +  /**
 +   * Register this broker as "alive" in zookeeper
 +   */
 +  def register() {
 +    val hostName = 
 +      if(host == null || host.trim.isEmpty) 
 +        InetAddress.getLocalHost.getCanonicalHostName 
 +      else
 +        host
 +    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-     ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, jmxPort)
++    ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, zkSessionTimeoutMs, jmxPort)
 +  }
 +
 +  /**
 +   *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
 +   *  connection for us. We need to re-register this broker in the broker registry.
 +   */
 +  class SessionExpireListener() extends IZkStateListener {
 +    @throws(classOf[Exception])
 +    def handleStateChanged(state: KeeperState) {
 +      // do nothing, since zkclient will do reconnect for us.
 +    }
 +
 +    /**
 +     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
 +     * any ephemeral nodes here.
 +     *
 +     * @throws Exception
 +     *             On any error.
 +     */
 +    @throws(classOf[Exception])
 +    def handleNewSession() {
 +      info("re-registering broker info in ZK for broker " + brokerId)
 +      register()
 +      info("done re-registering broker")
 +      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaServer.scala
index a925ae1,a26de88..c148fdf
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@@ -80,39 -73,31 +80,39 @@@ class KafkaServer(val config: KafkaConf
                                      config.socketSendBufferBytes,
                                      config.socketReceiveBufferBytes,
                                      config.socketRequestMaxBytes)
 +    socketServer.startup()
  
 -    socketServer.startup
 -
 -    /* start client */
 -    kafkaZookeeper = new KafkaZooKeeper(config)
 -    // starting relevant replicas and leader election for partitions assigned to this broker
 -    kafkaZookeeper.startup
 -
 -    info("Connecting to ZK: " + config.zkConnect)
 -
 -    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager, isShuttingDown)
 -
 -    kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
 -    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, kafkaController)
 +    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
 +    kafkaController = new KafkaController(config, zkClient)
 +    
 +    /* start processing requests */
 +    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
 -    Mx4jLoader.maybeLoad
 +   
 +    Mx4jLoader.maybeLoad()
  
 -    // start the replica manager
      replicaManager.startup()
 -    // start the controller
 +
      kafkaController.startup()
 -    // register metrics beans
 +    
 +    topicConfigManager = new TopicConfigManager(zkClient, logManager)
 +    topicConfigManager.startup()
 +    
 +    /* tell everyone we are alive */
-     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient)
++    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, config.zkSessionTimeoutMs, zkClient)
 +    kafkaHealthcheck.startup()
 +
 +    
      registerStats()
      startupComplete.set(true);
 -    info("Started")
 +    info("started")
 +  }
 +  
 +  private def initZk(): ZkClient = {
 +    info("Connecting to zookeeper on " + config.zkConnect)
 +    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
 +    ZkUtils.setupCommonPaths(zkClient)
 +    zkClient
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/utils/ZkUtils.scala
index 84744eb,ca1ce12..4094dcb
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@@ -54,12 -53,9 +55,12 @@@ object ZkUtils extends Logging 
      getTopicPath(topic) + "/partitions"
    }
  
 +  def getTopicConfigPath(topic: String): String = 
 +    TopicConfigPath + "/" + topic
 +  
    def getController(zkClient: ZkClient): Int= {
      readDataMaybeNull(zkClient, ControllerPath)._1 match {
-       case Some(controller) => controller.toInt
+       case Some(controller) => KafkaController.parseControllerId(controller)
        case None => throw new KafkaException("Controller doesn't exist")
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 0000000,06be990..abcbed8
mode 000000,100644..100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@@ -1,0 -1,251 +1,251 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package kafka.admin
+ 
+ import org.scalatest.junit.JUnit3Suite
+ import kafka.zk.ZooKeeperTestHarness
+ import kafka.utils.TestUtils._
+ import junit.framework.Assert._
+ import kafka.utils.{ZkUtils, Utils, TestUtils}
+ import kafka.cluster.Broker
+ import kafka.client.ClientUtils
+ import kafka.server.{KafkaConfig, KafkaServer}
+ 
+ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
+   val brokerId1 = 0
+   val brokerId2 = 1
+   val brokerId3 = 2
+   val brokerId4 = 3
+ 
+   val port1 = TestUtils.choosePort()
+   val port2 = TestUtils.choosePort()
+   val port3 = TestUtils.choosePort()
+   val port4 = TestUtils.choosePort()
+ 
+   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+   val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
+   val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+ 
+   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+   var brokers: Seq[Broker] = Seq.empty[Broker]
+ 
+   val partitionId = 0
+ 
+   val topic1 = "new-topic1"
+   val topic2 = "new-topic2"
+   val topic3 = "new-topic3"
+   val topic4 = "new-topic4"
+ 
+   override def setUp() {
+     super.setUp()
+     // start all the servers
+     val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+     val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
+     val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+ 
+     servers ++= List(server1, server2, server3, server4)
+     brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+ 
+     // create topics with 1 partition, 2 replicas, one on each broker
 -    CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
 -    CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2")
 -    CreateTopicCommand.createTopic(zkClient, topic3, 1, 4, "2:3:0:1")
 -    CreateTopicCommand.createTopic(zkClient, topic4, 1, 4, "0:3")
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1)))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, Map(0->Seq(1,2)))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, Map(0->Seq(2,3,0,1)))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, Map(0->Seq(0,3)))
+ 
+ 
+     // wait until leader is elected
+     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500)
+     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500)
+     var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500)
+     var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500)
+ 
+     debug("Leader for " + topic1  + " is elected to be: %s".format(leader1.getOrElse(-1)))
+     debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1)))
+     debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+     debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+ 
+     assertTrue("Leader should get elected", leader1.isDefined)
+     assertTrue("Leader should get elected", leader2.isDefined)
+     assertTrue("Leader should get elected", leader3.isDefined)
+     assertTrue("Leader should get elected", leader4.isDefined)
+ 
+     assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+     assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
+     assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
+     assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+   }
+ 
+   override def tearDown() {
+     servers.map(server => server.shutdown())
+     servers.map(server => Utils.rm(server.config.logDirs))
+     super.tearDown()
+   }
+ 
+   def testTopicDoesNotExist {
+     try {
+       AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
+       fail("Topic should not exist")
+     } catch {
 -      case e: AdministrationException => //this is good
++      case e: AdminOperationException => //this is good
+       case e2 => throw e2
+     }
+   }
+ 
+   def testWrongReplicaCount {
+     try {
+       AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
+       fail("Add partitions should fail")
+     } catch {
 -      case e: AdministrationException => //this is good
++      case e: AdminOperationException => //this is good
+       case e2 => throw e2
+     }
+   }
+ 
+   def testIncrementPartitions {
+     AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
+     // wait until leader is elected
+     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
+     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
+     val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get
+     val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get
+     assertEquals(leader1.get, leader1FromZk)
+     assertEquals(leader2.get, leader2FromZk)
+ 
+     // read metadata from a broker and verify the new topic partitions exist
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 1000)
+     val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions",
+       2000,0).topicsMetadata
+     val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
+     val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata
+     assertEquals(partitionDataForTopic1.size, 3)
+     assertEquals(partitionDataForTopic1(1).partitionId, 1)
+     assertEquals(partitionDataForTopic1(2).partitionId, 2)
+     val replicas = partitionDataForTopic1(1).replicas
+     assertEquals(replicas.size, 2)
+     assert(replicas.contains(partitionDataForTopic1(1).leader.get))
+   }
+ 
+   def testManualAssignmentOfReplicas {
+     AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
+     // wait until leader is elected
+     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
+     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
+     val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get
+     val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get
+     assertEquals(leader1.get, leader1FromZk)
+     assertEquals(leader2.get, leader2FromZk)
+ 
+     // read metadata from a broker and verify the new topic partitions exist
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 1000)
+     val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas",
+       2000,0).topicsMetadata
+     val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
+     val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata
+     assertEquals(partitionDataForTopic2.size, 3)
+     assertEquals(partitionDataForTopic2(1).partitionId, 1)
+     assertEquals(partitionDataForTopic2(2).partitionId, 2)
+     val replicas = partitionDataForTopic2(1).replicas
+     assertEquals(replicas.size, 2)
+     assert(replicas(0).id == 0 || replicas(0).id == 1)
+     assert(replicas(1).id == 0 || replicas(1).id == 1)
+   }
+ 
+   def testReplicaPlacement {
+     AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
+     // wait until leader is elected
+     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
+     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)
+     var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 3, 500)
+     var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 4, 500)
+     var leader5 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 5, 500)
+     var leader6 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 6, 500)
+ 
+     val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 1).get
+     val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 2).get
+     val leader3FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 3).get
+     val leader4FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 4).get
+     val leader5FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 5).get
+     val leader6FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 6).get
+ 
+     assertEquals(leader1.get, leader1FromZk)
+     assertEquals(leader2.get, leader2FromZk)
+     assertEquals(leader3.get, leader3FromZk)
+     assertEquals(leader4.get, leader4FromZk)
+     assertEquals(leader5.get, leader5FromZk)
+     assertEquals(leader6.get, leader6FromZk)
+ 
+     // read metadata from a broker and verify the new topic partitions exist
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 1000)
+     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 1000)
+ 
+     val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement",
+       2000,0).topicsMetadata
+ 
+     val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head
+     val partition1DataForTopic3 = metaDataForTopic3.partitionsMetadata(1)
+     val partition2DataForTopic3 = metaDataForTopic3.partitionsMetadata(2)
+     val partition3DataForTopic3 = metaDataForTopic3.partitionsMetadata(3)
+     val partition4DataForTopic3 = metaDataForTopic3.partitionsMetadata(4)
+     val partition5DataForTopic3 = metaDataForTopic3.partitionsMetadata(5)
+     val partition6DataForTopic3 = metaDataForTopic3.partitionsMetadata(6)
+ 
+     assertEquals(partition1DataForTopic3.replicas.size, 4)
+     assertEquals(partition1DataForTopic3.replicas(0).id, 3)
+     assertEquals(partition1DataForTopic3.replicas(1).id, 2)
+     assertEquals(partition1DataForTopic3.replicas(2).id, 0)
+     assertEquals(partition1DataForTopic3.replicas(3).id, 1)
+ 
+     assertEquals(partition2DataForTopic3.replicas.size, 4)
+     assertEquals(partition2DataForTopic3.replicas(0).id, 0)
+     assertEquals(partition2DataForTopic3.replicas(1).id, 3)
+     assertEquals(partition2DataForTopic3.replicas(2).id, 1)
+     assertEquals(partition2DataForTopic3.replicas(3).id, 2)
+ 
+     assertEquals(partition3DataForTopic3.replicas.size, 4)
+     assertEquals(partition3DataForTopic3.replicas(0).id, 1)
+     assertEquals(partition3DataForTopic3.replicas(1).id, 0)
+     assertEquals(partition3DataForTopic3.replicas(2).id, 2)
+     assertEquals(partition3DataForTopic3.replicas(3).id, 3)
+ 
+     assertEquals(partition4DataForTopic3.replicas.size, 4)
+     assertEquals(partition4DataForTopic3.replicas(0).id, 2)
+     assertEquals(partition4DataForTopic3.replicas(1).id, 3)
+     assertEquals(partition4DataForTopic3.replicas(2).id, 0)
+     assertEquals(partition4DataForTopic3.replicas(3).id, 1)
+ 
+     assertEquals(partition5DataForTopic3.replicas.size, 4)
+     assertEquals(partition5DataForTopic3.replicas(0).id, 3)
+     assertEquals(partition5DataForTopic3.replicas(1).id, 0)
+     assertEquals(partition5DataForTopic3.replicas(2).id, 1)
+     assertEquals(partition5DataForTopic3.replicas(3).id, 2)
+ 
+     assertEquals(partition6DataForTopic3.replicas.size, 4)
+     assertEquals(partition6DataForTopic3.replicas(0).id, 0)
+     assertEquals(partition6DataForTopic3.replicas(1).id, 1)
+     assertEquals(partition6DataForTopic3.replicas(2).id, 2)
+     assertEquals(partition6DataForTopic3.replicas(3).id, 3)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/admin/AdminTest.scala
index cc394a3,dc0013f..b4f4c58
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@@ -64,25 -74,54 +64,25 @@@ class AdminTest extends JUnit3Suite wit
  
    @Test
    def testManualReplicaAssignment() {
 -    val brokerList = Set(0, 1, 2, 3, 4)
 +    val brokers = List(0, 1, 2, 3, 4)
 +    TestUtils.createBrokersInZk(zkClient, brokers)
  
 -    // duplicated brokers
 -    try {
 -      val replicationAssignmentStr = "0,0,1:1,2,3"
 -      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
 -      fail("replication assginment shouldn't have duplicated brokers")
 -    }
 -    catch {
 -      case e: AdministrationException => // this is good
 -      case e2 => throw e2
 -    }
 -
 -    // non-exist brokers
 -    try {
 -      val replicationAssignmentStr = "0,1,2:1,2,7"
 -      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
 -      fail("replication assginment shouldn't contain non-exist brokers")
 -    }
 -    catch {
 -      case e: AdministrationException => // this is good
 -      case e2 => throw e2
 +    // duplicate brokers
 +    intercept[IllegalArgumentException] {
-       AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0)))
++      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,0)))
      }
  
      // inconsistent replication factor
 -    try {
 -      val replicationAssignmentStr = "0,1,2:1,2"
 -      CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
 -      fail("all partitions should have the same replication factor")
 -    }
 -    catch {
 -      case e: AdministrationException => // this is good
 -      case e2 => throw e2
 +    intercept[IllegalArgumentException] {
-       AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
++      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
      }
  
      // good assignment
 -    {
 -      val replicationAssignmentStr = "0:1:2,1:2:3"
 -      val expectedReplicationAssignment = Map(
 -        0 -> List(0, 1, 2),
 -        1 -> List(1, 2, 3)
 -      )
 -      val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
 -      assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
 -      for( (part, replicas) <- expectedReplicationAssignment ) {
 -        assertEquals(replicas, actualReplicationAssignment(part))
 -      }
 -    }
 +    val assignment = Map(0 -> List(0, 1, 2),
 +                         1 -> List(1, 2, 3))
-     AdminUtils.createTopicWithAssignment(zkClient, "test", assignment)               
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", assignment)
 +    val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
 +    assertEquals(assignment, found("test"))
    }
  
    @Test
@@@ -118,7 -157,7 +118,7 @@@
      val topic = "test"
      TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      // create leaders for all partitions
      TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
      val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
@@@ -126,9 -165,12 +126,9 @@@
      for(i <- 0 until actualReplicaList.size)
        assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
  
 -    try {
 -      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
 -      fail("shouldn't be able to create a topic already exists")
 -    } catch {
 -      case e: TopicExistsException => // this is good
 -      case e2 => throw e2
 +    intercept[TopicExistsException] {
 +      // shouldn't be able to create a topic that already exists
-       AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
++      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      }
    }
  
@@@ -139,7 -181,7 +139,7 @@@
      // create brokers
      val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      // reassign partition 0
      val newReplicas = Seq(0, 2, 3)
      val partitionToBeReassigned = 0
@@@ -164,7 -206,7 +164,7 @@@
      // create brokers
      val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      // reassign partition 0
      val newReplicas = Seq(1, 2, 3)
      val partitionToBeReassigned = 0
@@@ -190,7 -232,7 +190,7 @@@
      // create brokers
      val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      // reassign partition 0
      val newReplicas = Seq(2, 3)
      val partitionToBeReassigned = 0
@@@ -231,7 -273,7 +231,7 @@@
      val expectedReplicaAssignment = Map(0  -> List(0, 1))
      val topic = "test"
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      // put the partition in the reassigned path as well
      // reassign partition 0
      val newReplicas = Seq(0, 1)
@@@ -270,7 -312,7 +270,7 @@@
      // create brokers
      val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
      // broker 2 should be the leader since it was started first
      val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@@@ -291,7 -333,8 +291,7 @@@
      val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
      val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
      // create the topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
 -    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
 -
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
      TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000)
  
      val controllerId = ZkUtils.getController(zkClient)

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index b43b063,1ee34b9..ef1de83
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@@ -61,7 -60,7 +61,7 @@@ class ConsumerIteratorTest extends JUni
  
    override def setUp() {
      super.setUp
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
      waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
    }
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index b1d56d6,c5cddea..47130d3
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@@ -55,7 -55,7 +55,7 @@@ class FetcherTest extends JUnit3Suite w
  
    override def setUp() {
      super.setUp
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
      waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
      fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
      fetcher.stopConnections()

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index cb0e349,26e9bd6..b585f0e
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@@ -79,10 -79,11 +79,10 @@@ class RollingBounceTest extends JUnit3S
      val topic4 = "new-topic4"
  
      // create topics with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic1, Map(0->Seq(0,1)))
-     AdminUtils.createTopicWithAssignment(zkClient, topic2, Map(0->Seq(1,2)))
-     AdminUtils.createTopicWithAssignment(zkClient, topic3, Map(0->Seq(2,3)))
-     AdminUtils.createTopicWithAssignment(zkClient, topic4, Map(0->Seq(0,3)))
 -    CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
 -    CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2")
 -    CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3")
 -    CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3")
 -
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1)))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, Map(0->Seq(1,2)))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, Map(0->Seq(2,3)))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, Map(0->Seq(0,3)))
  
      // wait until leader is elected
      var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500)

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 72eccc1,29331db..f546c15
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@@ -206,7 -210,7 +207,7 @@@ class ProducerTest extends JUnit3Suite 
  
      val topic = "new-topic"
      // create topic
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0,0,0,0")
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
      // waiting for 1 partition is enough
      TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@@ -268,7 -273,7 +270,7 @@@
  
      val topic = "new-topic"
      // create topics in ZK
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0:1,0:1,0:1,0:1")
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
      TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index bbf0406,b3e89c3..6fa1abc
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@@ -114,6 -114,33 +114,33 @@@ class SyncProducerTest extends JUnit3Su
    }
  
    @Test
+   def testMessageSizeTooLargeWithAckZero() {
+     val server = servers.head
+ 
+     val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+     props.put("request.required.acks", "0")
+ 
+     val producer = new SyncProducer(new SyncProducerConfig(props))
 -    CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
++    AdminUtils.createTopic(zkClient, "test", 1, 1)
+     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
+ 
+     // This message will be dropped silently since message size too large.
+     producer.send(TestUtils.produceRequest("test", 0,
+       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+ 
+     // Send another message whose size is large enough to exceed the buffer size so
+     // the socket buffer will be flushed immediately;
+     // this send should fail since the socket has been closed
+     try {
+       producer.send(TestUtils.produceRequest("test", 0,
+         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+     } catch {
+       case e : java.io.IOException => // success
+       case e2 => throw e2
+     }
+   }
+ 
+   @Test
    def testProduceCorrectlyReceivesResponse() {
      val server = servers.head
      val props = TestUtils.getSyncProducerConfig(server.socketServer.port)

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index bb6fbdf,70e4b51..38e3ae7
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@@ -61,7 -61,7 +61,7 @@@ class LeaderElectionTest extends JUnit3
      val partitionId = 0
  
      // create topic with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(0, 1)))
  
      // wait until leader is elected
      val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@@ -108,7 -108,7 +108,7 @@@
      val partitionId = 0
  
      // create topic with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(0, 1)))
  
      // wait until leader is elected
      val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d2650e3,db46247..34e39e7
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@@ -71,7 -62,7 +71,7 @@@ class LogRecoveryTest extends JUnit3Sui
      producer = new Producer[Int, String](new ProducerConfig(producerProps))
  
      // create topic with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
  
      // wait until leader is elected
      var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@@ -104,7 -96,7 +104,7 @@@
      producer = new Producer[Int, String](new ProducerConfig(producerProps))
  
      // create topic with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
  
      // wait until leader is elected
      var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@@ -168,7 -161,7 +168,7 @@@
      producer = new Producer[Int, String](new ProducerConfig(producerProps))
  
      // create topic with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
  
      // wait until leader is elected
      var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@@ -201,7 -195,7 +201,7 @@@
      producer = new Producer[Int, String](new ProducerConfig(producerProps))
  
      // create topic with 1 partition, 2 replicas, one on each broker
-     AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId)))
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
++    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId)))
  
      // wait until leader is elected
      var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)

http://git-wip-us.apache.org/repos/asf/kafka/blob/75d95d9b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------


Mime
View raw message