kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [42/43] git commit: kafka-896; merge 0.8 (988d4d8e65a14390abd748318a64e281e4a37c19) to trunk; patched by Jun Rao; reviewed by Jay Kreps
Date Mon, 08 Jul 2013 23:15:07 GMT
kafka-896; merge 0.8 (988d4d8e65a14390abd748318a64e281e4a37c19) to trunk; patched by Jun Rao; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c98bdd3e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c98bdd3e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c98bdd3e

Branch: refs/heads/trunk
Commit: c98bdd3e4f6edb7a4db97a876c26e7d37e88e12a
Parents: 731ba90 988d4d8
Author: Jun Rao <junrao@gmail.com>
Authored: Mon Jul 8 15:42:24 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Jul 8 15:42:24 2013 -0700

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |  36 +-
 config/consumer.properties                      |   6 +-
 config/producer.properties                      |   4 +-
 config/server.properties                        |   6 +-
 .../main/java/kafka/etl/impl/DataGenerator.java |   2 +-
 contrib/hadoop-producer/README.md               |   2 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java  |   4 +-
 .../kafka/bridge/hadoop/KafkaRecordWriter.java  |   5 +-
 core/build.sbt                                  |   6 +-
 core/lib/metrics-annotation-3.0.0-c0c8be71.jar  | Bin 4766 -> 0 bytes
 core/lib/metrics-core-3.0.0-c0c8be71.jar        | Bin 81782 -> 0 bytes
 core/lib/zkclient-20120522.jar                  | Bin 99193 -> 0 bytes
 .../src/main/scala/kafka/admin/AdminUtils.scala |   4 +-
 .../scala/kafka/admin/ListTopicCommand.scala    |  90 ----
 .../PreferredReplicaLeaderElectionCommand.scala |   6 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  50 ++-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  42 +-
 .../main/scala/kafka/api/TopicMetadata.scala    |  12 +
 .../main/scala/kafka/client/ClientUtils.scala   |  18 +-
 .../main/scala/kafka/cluster/Partition.scala    |  70 ++-
 core/src/main/scala/kafka/cluster/Replica.scala |  10 +-
 .../LeaderElectionNotNeededException.scala      |  27 ++
 .../kafka/common/NoReplicaOnlineException.scala |  28 ++
 .../common/PartitionOfflineException.scala      |  28 --
 .../scala/kafka/consumer/ConsoleConsumer.scala  |   2 +-
 .../kafka/consumer/ConsumerConnector.scala      |   4 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |  43 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   2 +-
 .../scala/kafka/consumer/ConsumerIterator.scala |   2 +-
 .../kafka/consumer/PartitionTopicInfo.scala     |  17 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   6 +-
 .../controller/ControllerChannelManager.scala   |   8 +-
 .../kafka/controller/KafkaController.scala      | 118 +++---
 .../controller/PartitionLeaderSelector.scala    |  60 +--
 .../controller/PartitionStateMachine.scala      |  44 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  45 +-
 .../scala/kafka/javaapi/producer/Producer.scala |   4 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |   2 +-
 core/src/main/scala/kafka/log/Log.scala         |   6 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   2 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  10 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala |   2 +-
 .../kafka/message/ByteBufferMessageSet.scala    |   2 +-
 .../scala/kafka/network/RequestChannel.scala    |  18 +-
 .../main/scala/kafka/network/SocketServer.scala |  20 +-
 .../kafka/producer/BrokerPartitionInfo.scala    |   4 +-
 .../scala/kafka/producer/ConsoleProducer.scala  |   4 +-
 .../kafka/producer/KafkaLog4jAppender.scala     |   4 +-
 .../scala/kafka/producer/ProducerConfig.scala   |   2 +-
 .../producer/async/DefaultEventHandler.scala    |   2 +-
 .../producer/async/ProducerSendThread.scala     |   2 +-
 .../kafka/server/AbstractFetcherManager.scala   |   4 +-
 .../kafka/server/AbstractFetcherThread.scala    |  62 +--
 .../src/main/scala/kafka/server/KafkaApis.scala |  43 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   4 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  14 +-
 .../scala/kafka/server/ReplicaManager.scala     |  38 +-
 .../scala/kafka/server/RequestPurgatory.scala   |   4 +-
 .../scala/kafka/tools/KafkaMigrationTool.java   |  30 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  79 ++--
 .../scala/kafka/tools/ReplayLogProducer.scala   |   4 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   |  12 +-
 core/src/main/scala/kafka/utils/Utils.scala     |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  43 +-
 .../scala/other/kafka/TestEndToEndLatency.scala |   4 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   7 +-
 .../api/RequestResponseSerializationTest.scala  |   2 +-
 .../ZookeeperConsumerConnectorTest.scala        |   8 +-
 .../unit/kafka/integration/FetcherTest.scala    |   4 +-
 .../unit/kafka/metrics/KafkaTimerTest.scala     |  10 +-
 .../unit/kafka/network/SocketServerTest.scala   |   2 +
 .../unit/kafka/producer/AsyncProducerTest.scala |  18 +-
 .../unit/kafka/producer/ProducerTest.scala      |  14 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   3 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  24 +-
 .../src/main/java/kafka/examples/Consumer.java  |   6 +-
 .../src/main/java/kafka/examples/Producer.java  |   2 +-
 .../scala/kafka/perf/ConsumerPerformance.scala  |   2 +-
 .../scala/kafka/perf/ProducerPerformance.scala  |   2 +-
 project/Build.scala                             |  88 ++--
 project/build/KafkaProject.scala                |  46 --
 project/plugins.sbt                             |   2 +-
 sbt                                             |   2 +-
 .../config/migration_producer.properties        |   2 +-
 .../config/server.properties                    |   2 +
 .../testcase_9001/testcase_9001_properties.json |   6 +-
 .../testcase_9003/testcase_9003_properties.json |   6 +-
 .../testcase_9004/testcase_9004_properties.json |   6 +-
 .../testcase_9005/testcase_9005_properties.json |   6 +-
 .../testcase_9006/testcase_9006_properties.json |   6 +-
 .../mirror_maker_testsuite/cluster_config.json  |  10 +
 .../config/mirror_consumer.properties           |   6 +-
 .../config/mirror_producer.properties           |   5 +-
 .../config/server.properties                    |   4 +-
 .../mirror_maker_testsuite/mirror_maker_test.py |  32 +-
 .../testcase_5001/testcase_5001_properties.json |  22 +
 .../testcase_5002/testcase_5002_properties.json |  22 +
 .../testcase_5003/testcase_5003_properties.json |  13 +
 .../testcase_5004/testcase_5004_properties.json |  13 +
 .../testcase_5005/testcase_5005_properties.json |  14 +
 .../testcase_5006/testcase_5006_properties.json |  14 +
 .../config/server.properties                    |   4 +-
 .../replication_testsuite/replica_basic_test.py |  15 +-
 system_test/utils/kafka_system_test_utils.py    | 421 ++++++++++---------
 system_test/utils/replication_utils.py          |   3 +
 system_test/utils/system_test_utils.py          |  78 +++-
 107 files changed, 1225 insertions(+), 963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/config/server.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/ListTopicCommand.scala
index f91eca2,c760cc0..0000000
deleted file mode 100644,100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ /dev/null
@@@ -1,90 -1,107 +1,0 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *    http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package kafka.admin
--
--import joptsimple.OptionParser
--import org.I0Itec.zkclient.ZkClient
--import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
- import kafka.common.ErrorMapping
--
--object ListTopicCommand {
--
--  def main(args: Array[String]): Unit = {
--    val parser = new OptionParser
-     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
-                          .withRequiredArg
-                          .describedAs("topic")
-                          .ofType(classOf[String])
-                          .defaultsTo("")
-     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-                                       "Multiple URLS can be given to allow fail-over.")
-                            .withRequiredArg
-                            .describedAs("urls")
-                            .ofType(classOf[String])
- 
-     val options = parser.parse(args : _*)
- 
-     for(arg <- List(zkConnectOpt)) {
-       if(!options.has(arg)) {
-         System.err.println("Missing required argument \"" + arg + "\"")
-         parser.printHelpOn(System.err)
-         System.exit(1)
-       }
-     }
- 
-     val topic = options.valueOf(topicOpt)
-     val zkConnect = options.valueOf(zkConnectOpt)
-     var zkClient: ZkClient = null
-     try {
-       var topicList: Seq[String] = Nil
-       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
- 
-       if (topic == "")
-         topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath)
-       else
-         topicList = List(topic)
- 
-       if (topicList.size <= 0)
-         println("no topics exist!")
- 
-       for (t <- topicList)
-         showTopic(t, zkClient)
-     }
-     catch {
-       case e =>
-         println("list topic failed because of " + e.getMessage)
-         println(Utils.stackTrace(e))
-     }
-     finally {
-       if (zkClient != null)
-         zkClient.close()
-     }
-   }
- 
-   def showTopic(topic: String, zkClient: ZkClient) {
-     val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-     topicMetaData.errorCode match {
-       case ErrorMapping.UnknownTopicOrPartitionCode =>
-         println("topic " + topic + " doesn't exist!")
-       case _ =>
-         println("topic: " + topic)
-         for (part <- topicMetaData.partitionsMetadata)
-           println(part.toString)
-     }
-   }
- }
 -    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be listed. Defaults to all existing topics.")
 -                         .withRequiredArg
 -                         .describedAs("topic")
 -                         .ofType(classOf[String])
 -                         .defaultsTo("")
 -    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
 -                                      "Multiple URLS can be given to allow fail-over.")
 -                           .withRequiredArg
 -                           .describedAs("urls")
 -                           .ofType(classOf[String])
 -    val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
 -                                                            "if set, only show under replicated partitions")
 -    val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
 -                                                            "if set, only show partitions whose leader is not available")
 -
 -    val options = parser.parse(args : _*)
 -
 -    for(arg <- List(zkConnectOpt)) {
 -      if(!options.has(arg)) {
 -        System.err.println("Missing required argument \"" + arg + "\"")
 -        parser.printHelpOn(System.err)
 -        System.exit(1)
 -      }
 -    }
 -
 -    val topic = options.valueOf(topicOpt)
 -    val zkConnect = options.valueOf(zkConnectOpt)
 -    val reportUnderReplicatedPartitions = if (options.has(reportUnderReplicatedPartitionsOpt)) true else false
 -    val reportUnavailablePartitions = if (options.has(reportUnavailablePartitionsOpt)) true else false
 -    var zkClient: ZkClient = null
 -    try {
 -      var topicList: Seq[String] = Nil
 -      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
 -
 -      if (topic == "")
 -        topicList = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
 -      else
 -        topicList = List(topic)
 -
 -      if (topicList.size <= 0)
 -        println("no topics exist!")
 -
 -      val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
 -      for (t <- topicList)
 -        showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
 -    }
 -    catch {
 -      case e =>
 -        println("list topic failed because of " + e.getMessage)
 -        println(Utils.stackTrace(e))
 -    }
 -    finally {
 -      if (zkClient != null)
 -        zkClient.close()
 -    }
 -  }
 -
 -  def showTopic(topic: String, zkClient: ZkClient, reportUnderReplicatedPartitions: Boolean,
 -                reportUnavailablePartitions: Boolean, liveBrokers: Set[Int]) {
 -    ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
 -      case Some(topicPartitionAssignment) =>
 -        val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
 -        for ((partitionId, assignedReplicas) <- sortedPartitions) {
 -          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
 -          val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
 -          if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
 -              (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
 -              (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
 -            print("topic: " + topic)
 -            print("\tpartition: " + partitionId)
 -            print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
 -            print("\treplicas: " + assignedReplicas.mkString(","))
 -            println("\tisr: " + inSyncReplicas.mkString(","))
 -          }
 -        }
 -      case None =>
 -        println("topic " + topic + " doesn't exist!")
 -    }
 -  }
 -}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index a2afd16,d5de5f3..53fc433
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@@ -101,11 -101,10 +101,11 @@@ object PreferredReplicaLeaderElectionCo
        info("Created preferred replica election path with %s".format(jsonData))
      } catch {
        case nee: ZkNodeExistsException =>
 -        val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
 -        throw new AdministrationException("Preferred replica leader election currently in progress for " +
 +        val partitionsUndergoingPreferredReplicaElection =
-           PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
++          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
 +        throw new AdminOperationException("Preferred replica leader election currently in progress for " +
            "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
 -      case e2 => throw new AdministrationException(e2.toString)
 +      case e2 => throw new AdminOperationException(e2.toString)
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/TopicCommand.scala
index d364608,0000000..e604233
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@@ -1,185 -1,0 +1,207 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.admin
 +
 +import joptsimple._
 +import java.util.Properties
 +import kafka.utils._
 +import org.I0Itec.zkclient.ZkClient
 +import scala.collection._
 +import scala.collection.JavaConversions._
 +import kafka.common.Topic
 +import kafka.cluster.Broker
 +
 +object TopicCommand {
 +
 +  def main(args: Array[String]): Unit = {
 +    
 +    val opts = new TopicCommandOptions(args)
 +    
 +    // should have exactly one action
 +    val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
 +    if(actions != 1) {
 +      System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
 +      opts.parser.printHelpOn(System.err)
 +      System.exit(1)
 +    }
 +      
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 +    
 +    val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
 +    
 +    if(opts.options.has(opts.createOpt))
 +      createTopic(zkClient, opts)
 +    else if(opts.options.has(opts.alterOpt))
 +      alterTopic(zkClient, opts)
 +    else if(opts.options.has(opts.deleteOpt))
 +      deleteTopic(zkClient, opts)
 +    else if(opts.options.has(opts.listOpt))
 +      listTopics(zkClient)
 +    else if(opts.options.has(opts.describeOpt))
 +      describeTopic(zkClient, opts)
 +
 +    zkClient.close()
 +  }
 +
 +  def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
 +    val topics = opts.options.valuesOf(opts.topicOpt)
 +    val configs = parseTopicConfigs(opts)
 +    for (topic <- topics) {
 +      if (opts.options.has(opts.replicaAssignmentOpt)) {
 +        val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
 +        AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs)
 +      } else {
 +        CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
 +        val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
 +        val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
 +        AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
 +      }
 +      println("Created topic \"%s\".".format(topic))
 +    }
 +  }
 +  
 +  def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
 +    val topics = opts.options.valuesOf(opts.topicOpt)
 +    val configs = parseTopicConfigs(opts)
 +    if(opts.options.has(opts.partitionsOpt))
 +      Utils.croak("Changing the number of partitions is not supported.")
 +    if(opts.options.has(opts.replicationFactorOpt))
 +      Utils.croak("Changing the replication factor is not supported.")
 +    for(topic <- topics) {
 +      AdminUtils.changeTopicConfig(zkClient, topic, configs)
 +      println("Updated config for topic \"%s\".".format(topic))
 +    }
 +  }
 +  
 +  def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 +    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
 +    for(topic <- opts.options.valuesOf(opts.topicOpt)) {
 +      AdminUtils.deleteTopic(zkClient, topic)
 +      println("Topic \"%s\" deleted.".format(topic))
 +    }
 +  }
 +  
 +  def listTopics(zkClient: ZkClient) {
 +    for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
 +      println(topic)
 +  }
 +  
 +  def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
-     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-     val topics = opts.options.valuesOf(opts.topicOpt)
-     val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient)
-     for(md <- metadata) {
-       println(md.topic)
-       val config = AdminUtils.fetchTopicConfig(zkClient, md.topic)
-       println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
-       println("\tpartitions: " + md.partitionsMetadata.size)
-       for(pd <- md.partitionsMetadata) {
-         println("\t\tpartition " + pd.partitionId)
-         println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none"))
-         println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", "))
-         println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", "))
++    var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted
++    if (topics.size <= 0)
++      topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
++    val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
++    val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
++    val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
++    for (topic <- topics) {
++      ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
++        case Some(topicPartitionAssignment) =>
++          val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
++          if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) {
++            println(topic)
++            val config = AdminUtils.fetchTopicConfig(zkClient, topic)
++            println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
++            println("\tpartitions: " + sortedPartitions.size)
++          }
++          for ((partitionId, assignedReplicas) <- sortedPartitions) {
++            val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
++            val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
++            if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
++                (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
++                (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
++              print("\t\ttopic: " + topic)
++              print("\tpartition: " + partitionId)
++              print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
++              print("\treplicas: " + assignedReplicas.mkString(","))
++              println("\tisr: " + inSyncReplicas.mkString(","))
++            }
++          }
++        case None =>
++          println("topic " + topic + " doesn't exist!")
 +      }
 +    }
 +  }
 +  
 +  def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
 +  
 +  def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
 +    val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
 +    require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
 +    val props = new Properties
 +    configs.foreach(pair => props.setProperty(pair(0), pair(1)))
 +    props
 +  }
 +  
 +  def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
 +    val partitionList = replicaAssignmentList.split(",")
 +    val ret = new mutable.HashMap[Int, List[Int]]()
 +    for (i <- 0 until partitionList.size) {
 +      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
 +      ret.put(i, brokerList.toList)
 +      if (ret(i).size != ret(0).size)
 +        throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
 +    }
 +    ret.toMap
 +  }
 +  
 +  class TopicCommandOptions(args: Array[String]) {
 +    val parser = new OptionParser
 +    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
 +                                      "Multiple URLS can be given to allow fail-over.")
 +                           .withRequiredArg
 +                           .describedAs("urls")
 +                           .ofType(classOf[String])
 +    val listOpt = parser.accepts("list", "List all available topics.")
 +    val createOpt = parser.accepts("create", "Create a new topic.")
 +    val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
 +    val deleteOpt = parser.accepts("delete", "Delete the topic.")
 +    val describeOpt = parser.accepts("describe", "List details for the given topics.")
 +    val helpOpt = parser.accepts("help", "Print usage information.")
 +    val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
 +                         .withRequiredArg
 +                         .describedAs("topic")
 +                         .ofType(classOf[String])
 +    val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
 +                          .withRequiredArg
 +                          .describedAs("name=value")
 +                          .ofType(classOf[String])
 +    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.")
 +                           .withRequiredArg
 +                           .describedAs("# of partitions")
 +                           .ofType(classOf[java.lang.Integer])
 +    val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
 +                           .withRequiredArg
 +                           .describedAs("replication factor")
 +                           .ofType(classOf[java.lang.Integer])
 +    val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
 +                           .withRequiredArg
 +                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
 +                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
 +                           .ofType(classOf[String])
-     
++    val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
++                                                            "if set when describing topics, only show under replicated partitions")
++    val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
++                                                            "if set when describing topics, only show partitions whose leader is not available")
++
 +
 +    val options = parser.parse(args : _*)
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/cluster/Partition.scala
index f79a622,02d2c44..ce2a634
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@@ -25,9 -23,10 +25,10 @@@ import kafka.log.LogConfi
  import kafka.server.ReplicaManager
  import com.yammer.metrics.core.Gauge
  import kafka.metrics.KafkaMetricsGroup
- import kafka.common._
 -import kafka.common.{NotLeaderForPartitionException, ErrorMapping}
  import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
  import org.apache.log4j.Logger
+ import kafka.message.ByteBufferMessageSet
++import kafka.common.{TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
  
  
  /**
@@@ -80,13 -79,12 +81,16 @@@ class Partition(val topic: String
        case Some(replica) => replica
        case None =>
          if (isReplicaLocal(replicaId)) {
 -          val log = logManager.getOrCreateLog(topic, partitionId)
 -          val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset)
 +          val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
 +          val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
-           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) 
-           val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
++          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
++          val offsetMap = checkpoint.read
++          if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
++            warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
++          val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
            val localReplica = new Replica(replicaId, this, time, offset, Some(log))
            addReplicaIfNotExists(localReplica)
 -        }
 -        else {
 +        } else {
            val remoteReplica = new Replica(replicaId, this, time)
            addReplicaIfNotExists(remoteReplica)
          }
@@@ -314,8 -323,25 +329,25 @@@
      stuckReplicas ++ slowReplicas
    }
  
 -  def appendMessagesToLeader(messages: ByteBufferMessageSet): (Long, Long) = {
++  def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
+     leaderIsrUpdateLock synchronized {
+       val leaderReplicaOpt = leaderReplicaIfLocal()
+       leaderReplicaOpt match {
+         case Some(leaderReplica) =>
+           val log = leaderReplica.log.get
 -          val (start, end) = log.append(messages, assignOffsets = true)
++          val info = log.append(messages, assignOffsets = true)
+           // we may need to increment high watermark since ISR could be down to 1
+           maybeIncrementLeaderHW(leaderReplica)
 -          (start, end)
++          info
+         case None =>
+           throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
+             .format(topic, partitionId, localBrokerId))
+       }
+     }
+   }
+ 
    private def updateIsr(newIsr: Set[Replica]) {
-     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
+     debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
      // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0921ce6,398618f..3f3a239
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@@ -163,9 -160,9 +163,9 @@@ private[kafka] class ZookeeperConsumerC
          wildcardTopicWatcher.shutdown()
        try {
          if (config.autoCommitEnable)
 -          scheduler.shutdownNow()
 +          scheduler.shutdown()
          fetcher match {
-           case Some(f) => f.shutdown
+           case Some(f) => f.stopConnections
            case None =>
          }
          sendShutdownToAllQueues()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/FileMessageSet.scala
index abb160c,c4397b7..1afb533
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@@ -29,31 -29,22 +29,31 @@@ import java.util.concurrent.TimeUni
  import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
  
  /**
 - * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
 - * will fail on an immutable message set. An optional limit and start position can be applied to the message set
 - * which will control the position in the file at which the set begins.
 + * An on-disk message set. An optional start and end position can be applied to the message set
 + * which will allow slicing a subset of the file.
 + * @param file The file name for the underlying log data
 + * @param channel the underlying file channel used
 + * @param start A lower bound on the absolute position in the file from which the message set begins
 + * @param end The upper bound on the absolute position in the file at which the message set ends
 + * @param isSlice Should the start and end parameters be used for slicing?
   */
  @nonthreadsafe
 -class FileMessageSet private[kafka](val file: File,
 +class FileMessageSet private[kafka](@volatile var file: File,
                                      private[log] val channel: FileChannel,
 -                                    private[log] val start: Int = 0,
 -                                    private[log] val limit: Int = Int.MaxValue,
 -                                    initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging {
 +                                    private[log] val start: Int,
 +                                    private[log] val end: Int,
 +                                    isSlice: Boolean) extends MessageSet with Logging {
    
    /* the size of the message set in bytes */
 -  private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
 +  private val _size = 
 +    if(isSlice)
 +      new AtomicInteger(end - start) // don't check the file size if this is just a slice view
 +    else
 +      new AtomicInteger(math.min(channel.size().toInt, end) - start)
  
 -  if (initChannelPositionToEnd) {
 +  /* if this is not a slice, update the file pointer to the end of the file */
 +  if (!isSlice) {
-     info("Creating or reloading log segment %s".format(file.getAbsolutePath))
+     debug("Creating or reloading log segment %s".format(file.getAbsolutePath))
      /* set the file position to the last byte in the file */
      channel.position(channel.size)
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index d1c3d72,ef708e2..8c8d877
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -63,26 -119,23 +63,26 @@@ class Log(val dir: File
    private val unflushed = new AtomicInteger(0)
  
    /* last time it was flushed */
 -  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
 +  private val lastflushedTime = new AtomicLong(time.milliseconds)
  
    /* the actual segments of the log */
 -  private[log] val segments: SegmentList[LogSegment] = loadSegments()
 +  private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
 +  
 +  /* The number of times the log has been truncated */
 +  private val truncates = new AtomicInteger(0)
      
    /* Calculate the offset of the next message */
 -  private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
 -  
 +  private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
 +
-   debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
  
    newGauge(name + "-" + "NumLogSegments",
-            new Gauge[Int] { def getValue = numberOfSegments })
+            new Gauge[Int] { def value = numberOfSegments })
  
    newGauge(name + "-" + "LogEndOffset",
-            new Gauge[Long] { def getValue = logEndOffset })
+            new Gauge[Long] { def value = logEndOffset })
  
 -  /* The name of this log */
 +  /** The name of this log */
    def name  = dir.getName()
  
    /* Load the log segments from the log files on disk */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/LogManager.scala
index 0d567e4,4771d11..9002483
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@@ -198,17 -185,20 +198,17 @@@ class LogManager(val logDirs: Array[Fil
        val dataDir = nextLogDir()
        val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
        dir.mkdirs()
 -      val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
 -      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
        log = new Log(dir, 
 -                    maxLogFileSize, 
 -                    config.messageMaxBytes,
 -                    logFlushInterval, 
 -                    rollIntervalMs, 
 -                    needsRecovery = false, 
 -                    config.logIndexSizeMaxBytes,
 -                    config.logIndexIntervalBytes, 
 -                    time, 
 -                    config.brokerId)
 -      info("Created log for partition [%s,%d] in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
 +                    config,
 +                    needsRecovery = false,
 +                    scheduler,
 +                    time)
        logs.put(topicAndPartition, log)
-       info("Created log for topic %s partition %d in %s with properties {%s}."
++      info("Created log for partition [%s,%d] in %s with properties {%s}."
 +           .format(topicAndPartition.topic, 
 +                   topicAndPartition.partition, 
 +                   dataDir.getAbsolutePath,
 +                   JavaConversions.asMap(config.toProps).mkString(", ")))
        log
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/LogSegment.scala
index 30d2e91,213db6e..fbdc553
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@@ -127,54 -113,18 +127,62 @@@ class LogSegment(val log: FileMessageSe
            min(endPosition - startPosition.position, maxSize) 
          }
        }
 -    messageSet.read(startPosition.position, length)
 +    log.read(startPosition.position, length)
 +  }
 +  
 +  /**
 +   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
 +   * 
 +   * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
 +   * is corrupt.
 +   */
 +  @nonthreadsafe
 +  def recover(maxMessageSize: Int) {
 +    index.truncate()
 +    var validBytes = 0
 +    var lastIndexEntry = 0
 +    val iter = log.iterator(maxMessageSize)
 +    try {
 +      while(iter.hasNext) {
 +        val entry = iter.next
 +        entry.message.ensureValid()
 +        if(validBytes - lastIndexEntry > indexIntervalBytes) {
-           index.append(entry.offset, validBytes)
++          // we need to decompress the message, if required, to get the offset of the first uncompressed message
++          val startOffset =
++            entry.message.compressionCodec match {
++              case NoCompressionCodec =>
++                entry.offset
++              case _ =>
++                ByteBufferMessageSet.decompress(entry.message).head.offset
++          }
++          index.append(startOffset, validBytes)
 +          lastIndexEntry = validBytes
 +        }
 +        validBytes += MessageSet.entrySize(entry.message)
 +      }
 +    } catch {
 +      case e: InvalidMessageException => 
 +        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
 +    }
 +    val truncated = log.sizeInBytes - validBytes
 +    if(truncated > 0)
 +      warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath))
 +    log.truncateTo(validBytes)
    }
  
 -  override def toString() = "LogSegment(start=" + start + ", size=" + size + ")"
 +  override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
  
    /**
 -   * Truncate off all index and log entries with offsets greater than or equal to the current offset. 
 +   * Truncate off all index and log entries with offsets >= the given offset.
 +   * If the given offset is larger than the largest message in this segment, do nothing.
 +   * @param offset The offset to truncate to
 +   * @return The number of log bytes truncated
     */
 -  def truncateTo(offset: Long) {
 +  @nonthreadsafe
 +  def truncateTo(offset: Long): Int = {
      val mapping = translateOffset(offset)
      if(mapping == null)
 -      return
 +      return 0
      index.truncateTo(offset)
      // after truncation, reset and allocate more space for the (new currently  active) index
      index.resize(index.maxIndexSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/OffsetIndex.scala
index eff213e,60ebc52..361a9db
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@@ -90,9 -90,12 +90,9 @@@ class OffsetIndex(@volatile var file: F
    /* the last offset in the index */
    var lastOffset = readLastOffset()
    
-   info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
+   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
      .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
  
 -  /* the maximum number of entries this index can hold */
 -  def maxEntries = mmap.limit / 8
 -
    /**
     * The last offset written to the index
     */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index d7d8bbd,f5288bf..358d617
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -194,20 -186,17 +192,23 @@@ class KafkaApis(val requestChannel: Req
        BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
  
        try {
-         val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
-         val log = localReplica.log.get
-         val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
 -        val (start, end) =
++        val info =
+           partitionOpt match {
+             case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+             case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+               .format(topicAndPartition, brokerId))
+ 
+           }
 +        val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
 +
 +        // update stats
 +        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
 +        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
-         
-         // we may need to increment high watermark since ISR could be down to 1
-         localReplica.partition.maybeIncrementLeaderHW(localReplica)
++
          trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
 -              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
 -        ProduceResult(topicAndPartition, start, end)
 +              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
 +        ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset)
        } catch {
          // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
          // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request
@@@ -498,66 -434,6 +502,61 @@@
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    }
  
 +  /* 
 +   * Service the Offset commit API
 +   */
 +  def handleOffsetCommitRequest(request: RequestChannel.Request) {
 +    val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
-     if(requestLogger.isTraceEnabled)
-       requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString)
-     trace("Handling offset commit request " + offsetCommitRequest.toString)
 +    val responseInfo = offsetCommitRequest.requestInfo.map{
 +      case (topicAndPartition, metaAndError) => {
 +        val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
 +        try {
 +          if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
 +            (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
 +          } else {
 +            ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
 +              topicAndPartition.partition, metaAndError.offset.toString)
 +            (topicAndPartition, ErrorMapping.NoError)
 +          }
 +        } catch {
 +          case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
 +        }
 +      }
 +    }
 +    val response = new OffsetCommitResponse(responseInfo, 
 +                                            offsetCommitRequest.correlationId,
 +                                            offsetCommitRequest.clientId)
 +    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
 +  }
 +
 +  /*
 +   * Service the Offset fetch API
 +   */
 +  def handleOffsetFetchRequest(request: RequestChannel.Request) {
 +    val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
-     if(requestLogger.isTraceEnabled)
-       requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString)
 +    val responseInfo = offsetFetchRequest.requestInfo.map( t => {
 +      val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
 +      try {
 +        val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
 +        payloadOpt match {
 +          case Some(payload) => {
 +            (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
 +          } 
 +          case None => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
 +                          ErrorMapping.UnknownTopicOrPartitionCode))
 +        }
 +      } catch {
 +        case e => 
 +          (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
 +             ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
 +      }
 +    })
 +    val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
 +                                           offsetFetchRequest.correlationId,
 +                                           offsetFetchRequest.clientId)
 +    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
 +  }
 +
    def close() {
      debug("Shutting down.")
      fetchRequestPurgatory.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaServer.scala
index e2f4e91,b4a57c6..5f3b92c
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@@ -72,41 -66,34 +72,43 @@@ class KafkaServer(val config: KafkaConf
                                      config.port,
                                      config.numNetworkThreads,
                                      config.queuedMaxRequests,
+                                     config.socketSendBufferBytes,
+                                     config.socketReceiveBufferBytes,
                                      config.socketRequestMaxBytes)
 +    socketServer.startup()
  
 -    socketServer.startup
 -
 -    /* start client */
 -    kafkaZookeeper = new KafkaZooKeeper(config)
 -    // starting relevant replicas and leader election for partitions assigned to this broker
 -    kafkaZookeeper.startup
 -
 -    info("Connecting to ZK: " + config.zkConnect)
 -
 -    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
 -
 -    kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
 -    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
 +    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager)
 +    kafkaController = new KafkaController(config, zkClient)
 +    
 +    /* start processing requests */
 +    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config)
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
 -    Mx4jLoader.maybeLoad
 +   
 +    Mx4jLoader.maybeLoad()
  
 -    // start the replica manager
      replicaManager.startup()
 -    // start the controller
 +
      kafkaController.startup()
 -    // register metrics beans
 +    
 +    topicConfigManager = new TopicConfigManager(zkClient, logManager)
 +    topicConfigManager.startup()
 +    
 +    /* tell everyone we are alive */
 +    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient)
 +    kafkaHealthcheck.startup()
 +
 +    
      registerStats()
 +    
      info("started")
    }
 +  
 +  private def initZk(): ZkClient = {
 +    info("Connecting to zookeeper on " + config.zkConnect)
 +    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
 +    ZkUtils.setupCommonPaths(zkClient)
 +    zkClient
 +  }
  
    /**
     *  Forces some dynamic jmx beans to be registered on server startup.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 99e6f4e,03f621a..018c76f
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@@ -92,7 -94,9 +94,9 @@@ class ReplicaFetcherThread(name:String
         * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
         */
        val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
 -      log.truncateAndStartWithNewOffset(leaderStartOffset)
 +      log.truncateFullyAndStartAt(leaderStartOffset)
+       warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d"
+         .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset))
        leaderStartOffset
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala
index 477f60e,89ad4d7..a7b2146
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@@ -182,8 -180,8 +182,8 @@@ class ReplicaManager(val config: KafkaC
          partition.leaderReplicaIfLocal match {
            case Some(leaderReplica) => leaderReplica
            case None =>
-             throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d"
+             throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
 -                    .format(topic, partitionId, config.brokerId))
 +                                                     .format(topic, partitionId, config.brokerId))
          }
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/utils/ZkUtils.scala
index 8c68821,4f6fcd4..c230b65
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@@ -64,17 -59,20 +64,17 @@@ object ZkUtils extends Logging 
      }
    }
  
 -  def getTopicPartitionPath(topic: String, partitionId: Int): String ={
 +  def getTopicPartitionPath(topic: String, partitionId: Int): String =
      getTopicPartitionsPath(topic) + "/" + partitionId
 -  }
  
 -  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={
 +  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
      getTopicPartitionPath(topic, partitionId) + "/" + "state"
 -  }
  
 -  def getSortedBrokerList(zkClient: ZkClient): Seq[Int] ={
 +  def getSortedBrokerList(zkClient: ZkClient): Seq[Int] =
      ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted
 -  }
  
    def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
-     val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+     val brokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).sorted
      brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
    }
  
@@@ -184,10 -177,10 +184,10 @@@
  
    def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
      val replicas = getReplicasForPartition(zkClient, topic, partition)
-     debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
+     debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas))
      replicas.contains(brokerId.toString)
    }
 -
 +    
    def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
      val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
      val brokerInfo =

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/admin/AdminTest.scala
index b73e5d4,b0a0e09..1754c25
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@@ -336,6 -380,8 +336,7 @@@ class AdminTest extends JUnit3Suite wit
        var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
        var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
        assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
 -      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
+       assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
  
        leaderBeforeShutdown = leaderAfterShutdown
        controllerId = ZkUtils.getController(zkClient)
@@@ -345,7 -391,8 +346,7 @@@
        topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
        leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
        assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-       assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
 -      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+       assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
  
        leaderBeforeShutdown = leaderAfterShutdown
        controllerId = ZkUtils.getController(zkClient)
@@@ -355,8 -402,9 +356,8 @@@
        topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
        leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
        assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-       assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+       assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
 -    }
 -    finally {
 +    } finally {
        servers.foreach(_.shutdown())
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 845b966,c5cddea..67ed201
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@@ -55,10 -55,10 +55,10 @@@ class FetcherTest extends JUnit3Suite w
  
    override def setUp() {
      super.setUp
 -    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
 +    AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
      waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
      fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
-     fetcher.stopAllConnections()
+     fetcher.stopConnections()
      fetcher.startConnections(topicInfos, cluster)
    }
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 507e6a8,bc37531..1d82598
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@@ -198,10 -200,10 +198,10 @@@ class ProducerTest extends JUnit3Suite 
      props.put("partitioner.class", "kafka.utils.StaticPartitioner")
      props.put("request.timeout.ms", "2000")
      props.put("request.required.acks", "1")
-     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
  
      // create topic
 -    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
 +    AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
      assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@@ -296,28 -303,5 +296,28 @@@
      // we do this because the DefaultEventHandler retries a number of times
      assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries)
    }
 +  
 +  @Test
 +  def testSendNullMessage() {
 +    val props = new Properties()
 +    props.put("serializer.class", "kafka.serializer.StringEncoder")
 +    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
++    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 +    
 +    val config = new ProducerConfig(props)
 +    val producer = new Producer[String, String](config)
 +    try {
 +
 +      // create topic
 +      AdminUtils.createTopic(zkClient, "new-topic", 2, 1)
 +      assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
 +        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
 +      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 +    
 +      producer.send(new KeyedMessage[String, String]("new-topic", "key", null))
 +    } finally {
 +      producer.close()
 +    }
 +  }
  }
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 6801f4e,0000000..9aea67b
mode 100644,000000..100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@@ -1,219 -1,0 +1,219 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.server
 +
 +import java.io.File
 +import kafka.utils._
 +import junit.framework.Assert._
 +import java.util.{Random, Properties}
 +import kafka.consumer.SimpleConsumer
 +import org.junit.{After, Before, Test}
 +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 +import kafka.zk.ZooKeeperTestHarness
 +import org.scalatest.junit.JUnit3Suite
 +import kafka.admin.AdminUtils
 +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 +import kafka.utils.TestUtils._
 +import kafka.common.{ErrorMapping, TopicAndPartition}
 +import kafka.utils.nonthreadsafe
 +import kafka.utils.threadsafe
 +import org.junit.After
 +import org.junit.Before
 +import org.junit.Test
 +
 +class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
 +  val random = new Random() 
 +  var logDir: File = null
 +  var topicLogDir: File = null
 +  var server: KafkaServer = null
 +  var logSize: Int = 100
 +  val brokerPort: Int = 9099
 +  var simpleConsumer: SimpleConsumer = null
 +  var time: Time = new MockTime()
 +
 +  @Before
 +  override def setUp() {
 +    super.setUp()
 +    val config: Properties = createBrokerConfig(1, brokerPort)
 +    val logDirPath = config.getProperty("log.dir")
 +    logDir = new File(logDirPath)
 +    time = new MockTime()
 +    server = TestUtils.createServer(new KafkaConfig(config), time)
 +    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
 +  }
 +
 +  @After
 +  override def tearDown() {
 +    simpleConsumer.close
 +    server.shutdown
 +    Utils.rm(logDir)
 +    super.tearDown()
 +  }
 +
 +  @Test
 +  def testGetOffsetsForUnknownTopic() {
 +    val topicAndPartition = TopicAndPartition("foo", 0)
 +    val request = OffsetRequest(
 +      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
 +    val offsetResponse = simpleConsumer.getOffsetsBefore(request)
 +    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
 +                 offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeLatestTime() {
 +    val topicPartition = "kafka-" + 0
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    AdminUtils.createTopic(zkClient, topic, 1, 1)
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
 +
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest = OffsetRequest(
 +      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
 +      replicaId = 0)
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 +
 +    // try to fetch using latest offset
 +    val fetchResponse = simpleConsumer.fetch(
 +      new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
 +    assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
 +  }
 +
 +  @Test
 +  def testEmptyLogsGetOffsets() {
 +    val topicPartition = "kafka-" + random.nextInt(10)
 +    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
 +    topicLogDir = new File(topicPartitionPath)
 +    topicLogDir.mkdir
 +
 +    val topic = topicPartition.split("-").head
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    AdminUtils.createTopic(zkClient, topic, 1, 1)
 +    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 +
 +    var offsetChanged = false
 +    for(i <- 1 to 14) {
 +      val topicAndPartition = TopicAndPartition(topic, 0)
 +      val offsetRequest =
 +        OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
 +      val consumerOffsets =
 +        simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +
 +      if(consumerOffsets(0) == 1) {
 +        offsetChanged = true
 +      }
 +    }
 +    assertFalse(offsetChanged)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeNow() {
 +    val topicPartition = "kafka-" + random.nextInt(3)
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    AdminUtils.createTopic(zkClient, topic, 3, 1)
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeEarliestTime() {
 +    val topicPartition = "kafka-" + random.nextInt(3)
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    AdminUtils.createTopic(zkClient, topic, 3, 1)
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10)
 +
 +    assertEquals(Seq(0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest =
 +      OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(0L), consumerOffsets)
 +  }
 +
 +  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
 +    val props = new Properties
 +    props.put("broker.id", nodeId.toString)
 +    props.put("port", port.toString)
 +    props.put("log.dir", getLogDir.getAbsolutePath)
 +    props.put("log.flush.interval.messages", "1")
 +    props.put("enable.zookeeper", "false")
 +    props.put("num.partitions", "20")
 +    props.put("log.retention.hours", "10")
 +    props.put("log.retention.check.interval.ms", (5*1000*60).toString)
 +    props.put("log.segment.bytes", logSize.toString)
-     props.put("zk.connect", zkConnect.toString)
++    props.put("zookeeper.connect", zkConnect.toString)
 +    props
 +  }
 +
 +  private def getLogDir(): File = {
 +    val dir = TestUtils.tempDir()
 +    dir
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 40bfacb,f9c9e64..00ea98f
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@@ -129,7 -127,8 +129,7 @@@ object TestUtils extends Logging 
      props.put("host.name", "localhost")
      props.put("port", port.toString)
      props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
-     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 -    props.put("log.flush.interval.messages", "1")
+     props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
      props.put("replica.socket.timeout.ms", "1500")
      props
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c98bdd3e/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------


Mime
View raw message