kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [27/27] git commit: kafka-823; merge 0.8 (51421fcc0111031bb77f779a6f6c00520d526a34) to trunk; patched by Jun Rao; reviewed by Jay Kreps
Date Thu, 18 Apr 2013 04:54:21 GMT
kafka-823; merge 0.8 (51421fcc0111031bb77f779a6f6c00520d526a34) to trunk; patched by Jun Rao; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/731ba900
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/731ba900
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/731ba900

Branch: refs/heads/trunk
Commit: 731ba9007d4e2fa2f3a5e44f7b30827975fca420
Parents: 9ff4e8e 51421fc
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Apr 17 21:52:04 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Apr 17 21:52:04 2013 -0700

----------------------------------------------------------------------
 bin/kafka-run-class.sh                             |    2 +-
 config/log4j.properties                            |   11 +-
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   36 ++-
 .../PreferredReplicaLeaderElectionCommand.scala    |   51 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala    |   31 +--
 core/src/main/scala/kafka/api/FetchRequest.scala   |    4 +-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   21 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala     |    4 +-
 .../main/scala/kafka/api/OffsetCommitRequest.scala |    4 +-
 .../scala/kafka/api/OffsetCommitResponse.scala     |    4 +-
 .../main/scala/kafka/api/OffsetFetchRequest.scala  |    4 +-
 .../main/scala/kafka/api/OffsetFetchResponse.scala |    4 +-
 core/src/main/scala/kafka/api/OffsetRequest.scala  |    4 +-
 core/src/main/scala/kafka/api/OffsetResponse.scala |    4 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |    5 +-
 .../main/scala/kafka/api/ProducerResponse.scala    |    5 +-
 .../main/scala/kafka/api/RequestOrResponse.scala   |    2 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |   20 +-
 .../main/scala/kafka/api/StopReplicaResponse.scala |    5 +-
 .../scala/kafka/api/TopicMetadataRequest.scala     |    4 +-
 .../scala/kafka/api/TopicMetadataResponse.scala    |    3 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   57 +++--
 core/src/main/scala/kafka/common/Topic.scala       |    2 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    5 +
 .../main/scala/kafka/consumer/ConsumerConfig.scala |    9 +-
 .../kafka/consumer/ConsumerFetcherThread.scala     |    7 +-
 .../scala/kafka/consumer/ConsumerIterator.scala    |    7 +-
 .../main/scala/kafka/consumer/KafkaStream.scala    |    3 +-
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   70 ++----
 .../consumer/ZookeeperConsumerConnector.scala      |    7 +-
 .../controller/ControllerChannelManager.scala      |   40 ++--
 .../scala/kafka/controller/KafkaController.scala   |    5 +-
 .../kafka/controller/PartitionStateMachine.scala   |   81 ++++---
 .../kafka/controller/ReplicaStateMachine.scala     |   33 ++-
 .../scala/kafka/javaapi/TopicMetadataRequest.scala |    7 +-
 core/src/main/scala/kafka/log/Log.scala            |   24 +-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala    |    2 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |   10 +-
 .../main/scala/kafka/producer/ProducerPool.scala   |    4 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |    3 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   35 +++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |    2 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   43 +++-
 .../main/scala/kafka/server/ReplicaManager.scala   |   81 ++++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  110 ++++++---
 core/src/main/scala/kafka/tools/JmxTool.scala      |    4 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |   18 ++-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |   45 +++-
 .../scala/kafka/tools/StateChangeLogMerger.scala   |  188 +++++++++++++++
 core/src/main/scala/kafka/utils/Utils.scala        |   72 ++----
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   51 +++--
 .../api/RequestResponseSerializationTest.scala     |    5 +-
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    1 -
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |    2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |    2 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |   12 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |    3 +-
 .../kafka/perf/SimpleConsumerPerformance.scala     |    2 +-
 .../testcase_9001/testcase_9001_properties.json    |    6 +-
 .../testcase_9003/testcase_9003_properties.json    |    6 +-
 .../testcase_9004/testcase_9004_properties.json    |    6 +-
 .../testcase_9005/testcase_9005_properties.json    |    6 +-
 .../testcase_9006/testcase_9006_properties.json    |    6 +-
 .../testcase_5001/testcase_5001_properties.json    |   12 +-
 .../testcase_5002/testcase_5002_properties.json    |   12 +-
 .../testcase_5003/testcase_5003_properties.json    |   12 +-
 .../testcase_5004/testcase_5004_properties.json    |   12 +-
 .../testcase_5005/testcase_5005_properties.json    |   12 +-
 .../testcase_5006/testcase_5006_properties.json    |   12 +-
 .../testcase_0001/testcase_0001_properties.json    |   12 +-
 .../testcase_0002/testcase_0002_properties.json    |   12 +-
 .../testcase_0003/testcase_0003_properties.json    |   12 +-
 .../testcase_0004/testcase_0004_properties.json    |   12 +-
 .../testcase_0005/testcase_0005_properties.json    |   12 +-
 .../testcase_0006/testcase_0006_properties.json    |   12 +-
 .../testcase_0007/testcase_0007_properties.json    |   12 +-
 .../testcase_0008/testcase_0008_properties.json    |   12 +-
 .../testcase_0009/testcase_0009_properties.json    |   12 +-
 .../testcase_0010/testcase_0010_properties.json    |   12 +-
 .../testcase_0021/testcase_0021_properties.json    |   12 +-
 .../testcase_0022/testcase_0022_properties.json    |   12 +-
 .../testcase_0023/testcase_0023_properties.json    |   12 +-
 .../testcase_0101/testcase_0101_properties.json    |   12 +-
 .../testcase_0102/testcase_0102_properties.json    |   12 +-
 .../testcase_0103/testcase_0103_properties.json    |   12 +-
 .../testcase_0104/testcase_0104_properties.json    |   12 +-
 .../testcase_0105/testcase_0105_properties.json    |   12 +-
 .../testcase_0106/testcase_0106_properties.json    |   12 +-
 .../testcase_0107/testcase_0107_properties.json    |   12 +-
 .../testcase_0108/testcase_0108_properties.json    |   12 +-
 .../testcase_0109/testcase_0109_properties.json    |   12 +-
 .../testcase_0110/testcase_0110_properties.json    |   12 +-
 .../testcase_0111/testcase_0111_properties.json    |   13 +-
 .../testcase_0112/testcase_0112_properties.json    |   13 +-
 .../testcase_0113/testcase_0113_properties.json    |   13 +-
 .../testcase_0114/testcase_0114_properties.json    |   13 +-
 .../testcase_0115/testcase_0115_properties.json    |   13 +-
 .../testcase_0116/testcase_0116_properties.json    |   13 +-
 .../testcase_0117/testcase_0117_properties.json    |   13 +-
 .../testcase_0118/testcase_0118_properties.json    |   13 +-
 .../testcase_0121/testcase_0121_properties.json    |   13 +-
 .../testcase_0122/testcase_0122_properties.json    |   13 +-
 .../testcase_0123/testcase_0123_properties.json    |   13 +-
 .../testcase_0124/testcase_0124_properties.json    |   13 +-
 .../testcase_0125/testcase_0125_properties.json    |   13 +-
 .../testcase_0126/testcase_0126_properties.json    |   13 +-
 .../testcase_0127/testcase_0127_properties.json    |   13 +-
 .../testcase_0131/testcase_0131_properties.json    |   13 +-
 .../testcase_0132/testcase_0132_properties.json    |   13 +-
 .../testcase_0133/testcase_0133_properties.json    |   13 +-
 .../testcase_0151/testcase_0151_properties.json    |   13 +-
 .../testcase_0152/testcase_0152_properties.json    |   13 +-
 .../testcase_0153/testcase_0153_properties.json    |   13 +-
 .../testcase_0154/testcase_0154_properties.json    |   13 +-
 .../testcase_0155/testcase_0155_properties.json    |   13 +-
 .../testcase_0156/testcase_0156_properties.json    |   13 +-
 .../testcase_0157/testcase_0157_properties.json    |   13 +-
 .../testcase_0158/testcase_0158_properties.json    |   13 +-
 .../testcase_0201/testcase_0201_properties.json    |   12 +-
 .../testcase_0202/testcase_0202_properties.json    |   12 +-
 .../testcase_0203/testcase_0203_properties.json    |   12 +-
 .../testcase_0204/testcase_0204_properties.json    |   12 +-
 .../testcase_0205/testcase_0205_properties.json    |   12 +-
 .../testcase_0206/testcase_0206_properties.json    |   12 +-
 .../testcase_0207/testcase_0207_properties.json    |   12 +-
 .../testcase_0208/testcase_0208_properties.json    |   12 +-
 .../testcase_0251/testcase_0251_properties.json    |   12 +-
 .../testcase_0252/testcase_0252_properties.json    |   12 +-
 .../testcase_0253/testcase_0253_properties.json    |   12 +-
 .../testcase_0254/testcase_0254_properties.json    |   12 +-
 .../testcase_0255/testcase_0255_properties.json    |   12 +-
 .../testcase_0256/testcase_0256_properties.json    |   12 +-
 .../testcase_0257/testcase_0257_properties.json    |   12 +-
 .../testcase_0258/testcase_0258_properties.json    |   12 +-
 .../testcase_0301/testcase_0301_properties.json    |   12 +-
 .../testcase_0302/testcase_0302_properties.json    |   12 +-
 .../testcase_0303/testcase_0303_properties.json    |   12 +-
 .../testcase_0304/testcase_0304_properties.json    |   12 +-
 .../testcase_0305/testcase_0305_properties.json    |   12 +-
 .../testcase_0306/testcase_0306_properties.json    |   12 +-
 .../testcase_0307/testcase_0307_properties.json    |   12 +-
 .../testcase_0308/testcase_0308_properties.json    |   12 +-
 .../testcase_1/testcase_1_properties.json          |    6 +-
 .../testcase_4001/testcase_4001_properties.json    |   12 +-
 .../testcase_4002/testcase_4002_properties.json    |   12 +-
 .../testcase_4003/testcase_4003_properties.json    |   12 +-
 .../testcase_4004/testcase_4004_properties.json    |   12 +-
 .../testcase_4005/testcase_4005_properties.json    |   12 +-
 .../testcase_4006/testcase_4006_properties.json    |   12 +-
 .../testcase_4007/testcase_4007_properties.json    |   12 +-
 .../testcase_4008/testcase_4008_properties.json    |   12 +-
 .../testcase_4011/testcase_4011_properties.json    |   12 +-
 .../testcase_4012/testcase_4012_properties.json    |   12 +-
 .../testcase_4013/testcase_4013_properties.json    |   12 +-
 .../testcase_4014/testcase_4014_properties.json    |   12 +-
 .../testcase_4015/testcase_4015_properties.json    |   12 +-
 .../testcase_4016/testcase_4016_properties.json    |   12 +-
 .../testcase_4017/testcase_4017_properties.json    |   12 +-
 .../testcase_4018/testcase_4018_properties.json    |   12 +-
 .../testcase_9051/testcase_9051_properties.json    |    6 +-
 160 files changed, 1700 insertions(+), 788 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/config/log4j.properties
----------------------------------------------------------------------
diff --cc config/log4j.properties
index 1891f38,b76bc94..c611786
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@@ -36,12 -36,12 +36,18 @@@ log4j.appender.requestAppender.File=kaf
  log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
  log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
  
 +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
 +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
 +log4j.appender.cleanerAppender.File=log-cleaner.log
 +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
 +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 +
+ log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+ log4j.appender.controllerAppender.File=controller.log
+ log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+ log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+ 
  # Turn on all our debugging info
  #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
  #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@@ -56,13 -56,13 +62,16 @@@ log4j.additivity.kafka.network.RequestC
  #log4j.logger.kafka.network.Processor=TRACE, requestAppender
  #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
  #log4j.additivity.kafka.server.KafkaApis=false
 -log4j.logger.kafka.request.logger=TRACE, requestAppender
 +log4j.logger.kafka.request.logger=WARN, requestAppender
  log4j.additivity.kafka.request.logger=false
  
- log4j.logger.kafka.controller=TRACE, stateChangeAppender
+ log4j.logger.kafka.controller=TRACE, controllerAppender
  log4j.additivity.kafka.controller=false
  
 +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
 +log4j.additivity.kafka.log.LogCleaner=false
 +log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender
 +log4j.additivity.kafka.log.Cleaner=false
++
+ log4j.logger.state.change.logger=TRACE, stateChangeAppender
+ log4j.additivity.state.change.logger=false
 -
 -

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/AdminUtils.scala
index 6479385,f4bf3b9..b896182
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@@ -253,9 -167,10 +264,10 @@@ object AdminUtils extends Logging 
            }
        }
      }
+     brokerMetadata.filter(_.isDefined).map(_.get)
    }
  
 -  private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
 +  private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
      val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
      (firstReplicaIndex + shift) % nBrokers
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 49342c6,7405c5a..a2afd16
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@@ -76,16 -69,20 +69,20 @@@ object PreferredReplicaLeaderElectionCo
      }
    }
  
-   def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
+   def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
      Json.parseFull(jsonString) match {
-       case Some(partitionList) =>
-         val partitions = (partitionList.asInstanceOf[List[Any]])
-         Set.empty[TopicAndPartition] ++ partitions.map { m =>
-           val topic = m.asInstanceOf[Map[String, String]].get("topic").get
-           val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
-           TopicAndPartition(topic, partition)
+       case Some(m) =>
+         m.asInstanceOf[Map[String, Any]].get("partitions") match {
+           case Some(partitionsList) =>
+             val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
+             partitions.map { p =>
+               val topic = p.get("topic").get.asInstanceOf[String]
+               val partition = p.get("partition").get.asInstanceOf[Int]
+               TopicAndPartition(topic, partition)
+             }.toSet
 -          case None => throw new AdministrationException("Preferred replica election data is empty")
++          case None => throw new AdminOperationException("Preferred replica election data is empty")
          }
 -      case None => throw new AdministrationException("Preferred replica election data is empty")
 +      case None => throw new AdminOperationException("Preferred replica election data is empty")
      }
    }
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 1ca37e2,0000000..1cbe6e8
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@@ -1,109 -1,0 +1,109 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.api
 +
 +import java.nio.ByteBuffer
 +
 +import kafka.api.ApiUtils._
 +import kafka.utils.Logging
 +import kafka.network.{RequestChannel, BoundedByteBufferSend}
 +import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
 +import kafka.network.RequestChannel.Response
 +object OffsetCommitRequest extends Logging {
 +  val CurrentVersion: Short = 0
 +  val DefaultClientId = ""
 +
 +  def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
 +    // Read values from the envelope
 +    val versionId = buffer.getShort
 +    val correlationId = buffer.getInt
 +    val clientId = readShortString(buffer)
 +
 +    // Read the OffsetRequest 
 +    val consumerGroupId = readShortString(buffer)
 +    val topicCount = buffer.getInt
 +    val pairs = (1 to topicCount).flatMap(_ => {
 +      val topic = readShortString(buffer)
 +      val partitionCount = buffer.getInt
 +      (1 to partitionCount).map(_ => {
 +        val partitionId = buffer.getInt
 +        val offset = buffer.getLong
 +        val metadata = readShortString(buffer)
 +        (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
 +      })
 +    })
 +    OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
 +  }
 +}
 +
 +case class OffsetCommitRequest(groupId: String,
 +                               requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
 +                               versionId: Short = OffsetCommitRequest.CurrentVersion,
-                                correlationId: Int = 0,
++                               override val correlationId: Int = 0,
 +                               clientId: String = OffsetCommitRequest.DefaultClientId)
-     extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
++    extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) {
 +
 +  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 +  
 +  def writeTo(buffer: ByteBuffer) {
 +    // Write envelope
 +    buffer.putShort(versionId)
 +    buffer.putInt(correlationId)
 +    writeShortString(buffer, clientId)
 +
 +    // Write OffsetCommitRequest
 +    writeShortString(buffer, groupId)             // consumer group
 +    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
 +    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
 +      writeShortString(buffer, t1._1) // topic
 +      buffer.putInt(t1._2.size)       // number of partitions for this topic
 +      t1._2.foreach( t2 => {
 +        buffer.putInt(t2._1.partition)  // partition
 +        buffer.putLong(t2._2.offset)    // offset
 +        writeShortString(buffer, t2._2.metadata) // metadata
 +      })
 +    })
 +  }
 +
 +  override def sizeInBytes =
 +    2 + /* versionId */
 +    4 + /* correlationId */
 +    shortStringLength(clientId) +
 +    shortStringLength(groupId) + 
 +    4 + /* topic count */
 +    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
 +      val (topic, offsets) = topicAndOffsets
 +      count +
 +      shortStringLength(topic) + /* topic */
 +      4 + /* number of partitions */
 +      offsets.foldLeft(0)((innerCount, offsetAndMetadata) => {
 +        innerCount +
 +        4 /* partition */ +
 +        8 /* offset */ +
 +        shortStringLength(offsetAndMetadata._2.metadata)
 +      })
 +    })
 +
 +  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
 +    val responseMap = requestInfo.map {
 +      case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
 +    }.toMap
 +    val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
 +    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 3b0d861,0000000..cbb5fa1
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@@ -1,89 -1,0 +1,89 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.api
 +
 +import java.nio.ByteBuffer
 +
 +import kafka.api.ApiUtils._
 +import kafka.common.TopicAndPartition
 +import kafka.utils.Logging
 +
 +object OffsetCommitResponse extends Logging {
 +  val CurrentVersion: Short = 0
 +  val DefaultClientId = ""
 +
 +  def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
 +    // Read values from the envelope
 +    val correlationId = buffer.getInt
 +    val clientId = readShortString(buffer)
 +
 +    // Read the OffsetResponse 
 +    val topicCount = buffer.getInt
 +    val pairs = (1 to topicCount).flatMap(_ => {
 +      val topic = readShortString(buffer)
 +      val partitionCount = buffer.getInt
 +      (1 to partitionCount).map(_ => {
 +        val partitionId = buffer.getInt
 +        val error = buffer.getShort
 +        (TopicAndPartition(topic, partitionId), error)
 +      })
 +    })
 +    OffsetCommitResponse(Map(pairs:_*), correlationId, clientId)
 +  }
 +}
 +
 +case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
-                                correlationId: Int = 0,
++                               override val correlationId: Int = 0,
 +                               clientId: String = OffsetCommitResponse.DefaultClientId)
-     extends RequestOrResponse {
++    extends RequestOrResponse(correlationId = correlationId) {
 +
 +  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 +
 +  def writeTo(buffer: ByteBuffer) {
 +    // Write envelope
 +    buffer.putInt(correlationId)
 +    writeShortString(buffer, clientId)
 +
 +    // Write OffsetCommitResponse
 +    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
 +    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
 +      writeShortString(buffer, t1._1) // topic
 +      buffer.putInt(t1._2.size)       // number of partitions for this topic
 +      t1._2.foreach( t2 => {  // TopicAndPartition -> Short
 +        buffer.putInt(t2._1.partition)
 +        buffer.putShort(t2._2)  //error
 +      })
 +    })
 +  }
 +
 +  override def sizeInBytes = 
 +    4 + /* correlationId */
 +    shortStringLength(clientId) +
 +    4 + /* topic count */
 +    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
 +      val (topic, offsets) = topicAndOffsets
 +      count +
 +      shortStringLength(topic) + /* topic */
 +      4 + /* number of partitions */
 +      offsets.size * (
 +        4 + /* partition */
 +        2 /* error */
 +      )
 +    })
 +}
 +

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index fe94f17,0000000..a4c5623
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@@ -1,101 -1,0 +1,101 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.api
 +
 +import java.nio.ByteBuffer
 +
 +import kafka.api.ApiUtils._
 +import kafka.utils.Logging
 +import kafka.network.{BoundedByteBufferSend, RequestChannel}
 +import kafka.network.RequestChannel.Response
 +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
 +object OffsetFetchRequest extends Logging {
 +  val CurrentVersion: Short = 0
 +  val DefaultClientId = ""
 +
 +  def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
 +    // Read values from the envelope
 +    val versionId = buffer.getShort
 +    val correlationId = buffer.getInt
 +    val clientId = readShortString(buffer)
 +
 +    // Read the OffsetFetchRequest
 +    val consumerGroupId = readShortString(buffer)
 +    val topicCount = buffer.getInt
 +    val pairs = (1 to topicCount).flatMap(_ => {
 +      val topic = readShortString(buffer)
 +      val partitionCount = buffer.getInt
 +      (1 to partitionCount).map(_ => {
 +        val partitionId = buffer.getInt
 +        TopicAndPartition(topic, partitionId)
 +      })
 +    })
 +    OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId)
 +  }
 +}
 +
 +case class OffsetFetchRequest(groupId: String,
 +                               requestInfo: Seq[TopicAndPartition],
 +                               versionId: Short = OffsetFetchRequest.CurrentVersion,
-                                correlationId: Int = 0,
++                               override val correlationId: Int = 0,
 +                               clientId: String = OffsetFetchRequest.DefaultClientId)
-     extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
++    extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) {
 +
 +  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
 +  
 +  def writeTo(buffer: ByteBuffer) {
 +    // Write envelope
 +    buffer.putShort(versionId)
 +    buffer.putInt(correlationId)
 +    writeShortString(buffer, clientId)
 +
 +    // Write OffsetFetchRequest
 +    writeShortString(buffer, groupId)             // consumer group
 +    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
 +    requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition])
 +      writeShortString(buffer, t1._1) // topic
 +      buffer.putInt(t1._2.size)       // number of partitions for this topic
 +      t1._2.foreach( t2 => {
 +        buffer.putInt(t2.partition)
 +      })
 +    })
 +  }
 +
 +  override def sizeInBytes =
 +    2 + /* versionId */
 +    4 + /* correlationId */
 +    shortStringLength(clientId) +
 +    shortStringLength(groupId) + 
 +    4 + /* topic count */
 +    requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
 +      count + shortStringLength(t._1) + /* topic */
 +      4 + /* number of partitions */
 +      t._2.size * 4 /* partition */
 +    })
 +
 +  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
 +    val responseMap = requestInfo.map {
 +      case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
 +        offset=OffsetMetadataAndError.InvalidOffset,
 +        error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
 +      ))
 +    }.toMap
 +    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
 +    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index 3d4ce2a,0000000..71c2efb
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@@ -1,96 -1,0 +1,96 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.api
 +
 +import java.nio.ByteBuffer
 +
 +import kafka.api.ApiUtils._
 +import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
 +import kafka.utils.Logging
 +
 +object OffsetFetchResponse extends Logging {
 +  val CurrentVersion: Short = 0
 +  val DefaultClientId = ""
 +
 +  def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
 +    // Read values from the envelope
 +    val correlationId = buffer.getInt
 +    val clientId = readShortString(buffer)
 +
 +    // Read the OffsetResponse 
 +    val topicCount = buffer.getInt
 +    val pairs = (1 to topicCount).flatMap(_ => {
 +      val topic = readShortString(buffer)
 +      val partitionCount = buffer.getInt
 +      (1 to partitionCount).map(_ => {
 +        val partitionId = buffer.getInt
 +        val offset = buffer.getLong
 +        val metadata = readShortString(buffer)
 +        val error = buffer.getShort
 +        (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
 +      })
 +    })
 +    OffsetFetchResponse(Map(pairs:_*), correlationId, clientId)
 +  }
 +}
 +
 +case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
-                                correlationId: Int = 0,
++                               override val correlationId: Int = 0,
 +                               clientId: String = OffsetFetchResponse.DefaultClientId)
-     extends RequestOrResponse {
++    extends RequestOrResponse(correlationId = correlationId) {
 +
 +  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 +
 +  def writeTo(buffer: ByteBuffer) {
 +    // Write envelope
 +    buffer.putInt(correlationId)
 +    writeShortString(buffer, clientId)
 +
 +    // Write OffsetFetchResponse
 +    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
 +    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
 +      writeShortString(buffer, t1._1) // topic
 +      buffer.putInt(t1._2.size)       // number of partitions for this topic
 +      t1._2.foreach( t2 => { // TopicAndPartition -> OffsetMetadataAndError 
 +        buffer.putInt(t2._1.partition)
 +        buffer.putLong(t2._2.offset)
 +        writeShortString(buffer, t2._2.metadata)
 +        buffer.putShort(t2._2.error)
 +      })
 +    })
 +  }
 +
 +  override def sizeInBytes = 
 +    4 + /* correlationId */
 +    shortStringLength(clientId) +
 +    4 + /* topic count */
 +    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
 +      val (topic, offsets) = topicAndOffsets
 +      count +
 +      shortStringLength(topic) + /* topic */
 +      4 + /* number of partitions */
 +      offsets.foldLeft(0)((innerCount, offsetsAndMetadata) => {
 +        innerCount +
 +        4 /* partition */ +
 +        8 /* offset */ +
 +        shortStringLength(offsetsAndMetadata._2.metadata) +
 +        2 /* error */
 +      })
 +    })
 +}
 +

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/cluster/Partition.scala
index 367ccd5,6e73003..f79a622
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@@ -25,8 -23,9 +25,9 @@@ import kafka.log.LogConfi
  import kafka.server.ReplicaManager
  import com.yammer.metrics.core.Gauge
  import kafka.metrics.KafkaMetricsGroup
 -import kafka.common.ErrorMapping
 +import kafka.common._
  import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+ import org.apache.log4j.Logger
  
  
  /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index 631953f,7d71451..d1c3d72
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -206,78 -255,84 +206,80 @@@ class Log(val dir: File
     * This method will generally be responsible for assigning offsets to the messages, 
     * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
     * 
 -   * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, 
 -   * or (-1,-1) if the message set is empty
 +   * @param messages The message set to append
 +   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
 +   * 
 +   * @throws KafkaStorageException If the append fails due to an I/O error.
 +   * 
-    * @return Information about the appended messages including the first and last offset
++   * @return Information about the appended messages including the first and last offset.
     */
 -  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
 -    val messageSetInfo = analyzeAndValidateMessageSet(messages)
 -
 +  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
 +    val appendInfo = analyzeAndValidateMessageSet(messages)
 +    
      // if we have any valid messages, append them to the log
-     if(appendInfo.count == 0)
 -    if(messageSetInfo.count == 0) {
 -      (-1L, -1L)
 -    } else {
 -      // trim any invalid bytes or partial messages before appending it to the on-disk log
 -      var validMessages = trimInvalidBytes(messages)
++    if(appendInfo.shallowCount == 0)
 +      return appendInfo
 +      
 +    // trim any invalid bytes or partial messages before appending it to the on-disk log
 +    var validMessages = trimInvalidBytes(messages)
 +
 +    try {
 +      // they are valid, insert them in the log
 +      lock synchronized {
++        appendInfo.firstOffset = nextOffset.get
+ 
 -      try {
 -        // they are valid, insert them in the log
 -        val offsets = lock synchronized {
 -          val firstOffset = nextOffset.get
 +        // maybe roll the log if this segment is full
 +        val segment = maybeRoll()
-           
+ 
 -          // maybe roll the log if this segment is full
 -          val segment = maybeRoll(segments.view.last)
 -          
 +        if(assignOffsets) {
            // assign offsets to the messageset
-           appendInfo.firstOffset = nextOffset.get
 -          val lastOffset =
 -            if(assignOffsets) {
 -              val offsetCounter = new AtomicLong(nextOffset.get)
 -              try {
 -                validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
 -              } catch {
 -                case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
 -              }
 -              val assignedLastOffset = offsetCounter.get - 1
 -              val numMessages = assignedLastOffset - firstOffset + 1
 -              BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
 -              BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
 -              assignedLastOffset
 -            } else {
 -              require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)
 -              require(messageSetInfo.firstOffset >= nextOffset.get, 
 -                      "Attempt to append a message set beginning with offset %d to a log with log end offset %d."
 -                      .format(messageSetInfo.firstOffset, nextOffset.get))
 -              messageSetInfo.lastOffset
 -            }
 -
 -          // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
 -          // happens with the new message size (after re-compression, if any)
 -          for(messageAndOffset <- validMessages.shallowIterator) {
 -            if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
 -              throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
 -                .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
 +          val offset = new AtomicLong(nextOffset.get)
 +          try {
 +            validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
 +          } catch {
 +            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
            }
 +          appendInfo.lastOffset = offset.get - 1
 +        } else {
 +          // we are taking the offsets we are given
 +          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
 +            throw new IllegalArgumentException("Out of order offsets found in " + messages)
 +        }
  
 -          // now append to the log
 -          segment.append(firstOffset, validMessages)
 -          
 -          // advance the log end offset
 -          nextOffset.set(lastOffset + 1)
 +        // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
 +        // happens with the new message size (after re-compression, if any)
 +        for(messageAndOffset <- validMessages.shallowIterator) {
 +          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize)
 +            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
 +              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
 +        }
  
 -          trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
 -                  .format(this.name, firstOffset, nextOffset.get(), validMessages))
 +        // now append to the log
-         trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
 +        segment.append(appendInfo.firstOffset, validMessages)
-         
+ 
 -          // return the offset at which the messages were appended
 -          (firstOffset, lastOffset)
 -        }
 -        
 -        // maybe flush the log and index
 -        val numAppendedMessages = (offsets._2 - offsets._1 + 1).toInt
 -        maybeFlush(numAppendedMessages)
 -        
 -        // return the first and last offset
 -        offsets
 -      } catch {
 -        case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
 +        // increment the log end offset
 +        nextOffset.set(appendInfo.lastOffset + 1)
-           
-         // maybe flush the log and index
-         maybeFlush(appendInfo.count)
-         
++
++        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
++                .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages))
++
++        maybeFlush(appendInfo.shallowCount)
++
 +        appendInfo
        }
 +    } catch {
 +      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
      }
    }
    
 -  /* struct to hold various quantities we compute about each message set before appending to the log */
 -  case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean)
 +  /** Struct to hold various quantities we compute about each message set before appending to the log
 +   * @param firstOffset The first offset in the message set
 +   * @param lastOffset The last offset in the message set
 +   * @param codec The codec used in the message set
 +   * @param count The number of messages
 +   * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
 +   */
-   case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean)
++  case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean)
    
    /**
     * Validate the following:

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index f8faf96,87ca6b0..d7d8bbd
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -196,18 -190,16 +196,22 @@@ class KafkaApis(val requestChannel: Req
        try {
          val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
          val log = localReplica.log.get
 -        val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
 +        val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
-         
++        val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
++
 +        // update stats
-         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(info.count)
-         BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(info.count)
++        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
++        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 +        
          // we may need to increment high watermark since ISR could be down to 1
          localReplica.partition.maybeIncrementLeaderHW(localReplica)
          trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
 -              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
 -        ProduceResult(topicAndPartition, start, end)
 +              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
 +        ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset)
        } catch {
+         // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+         // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request
+         // for a partition it is the leader for
          case e: KafkaStorageException =>
            fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
            Runtime.getRuntime.halt(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaConfig.scala
index 5e4c9ca,549b4b0..96cbd62
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@@ -145,11 -110,8 +145,11 @@@ class KafkaConfig private (val props: V
    val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
  
    /* the number of messages accumulated on a log partition before messages are flushed to disk */
-   val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
+   val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue))
  
 +  /* the amount of time to wait before deleting a file from the filesystem */
 +  val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
 +
    /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
    val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 29d0af7,d4f15c1..99e6f4e
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@@ -63,21 -63,38 +63,38 @@@ class ReplicaFetcherThread(name:String
      }
    }
  
-   // handle a partition whose offset is out of range and return a new fetch offset
+   /**
+    * Handle a partition whose offset is out of range and return a new fetch offset.
+    */
    def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
-     // This means the local replica is out of date. Truncate the log and catch up from beginning.
-     val request = OffsetRequest(
-       replicaId = brokerConfig.brokerId,
-       requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
-     )
-     val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
-     val offset = partitionErrorAndOffset.error match {
-       case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
-       case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
-     }
      val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
-     replica.log.get.truncateFullyAndStartAt(offset)
-     offset
+     val log = replica.log.get
+ 
+     /**
+      * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
+      * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
+      * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
+      * and it may discover that the current leader's end offset is behind its own end offset.
+      *
+      * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
+      *
+      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
+      */
+     val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
+     if (leaderEndOffset < log.logEndOffset) {
+       log.truncateTo(leaderEndOffset)
+       leaderEndOffset
+     } else {
+       /**
+        * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+        * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+        *
+        * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
+        */
+       val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
 -      log.truncateAndStartWithNewOffset(leaderStartOffset)
++      log.truncateFullyAndStartAt(leaderStartOffset)
+       leaderStartOffset
+     }
    }
  
    // any logic for partitions whose leader has changed

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala
index 765d3cb,68e712c..477f60e
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@@ -48,10 -48,11 +50,11 @@@ class ReplicaManager(val config: KafkaC
    private var leaderPartitions = new mutable.HashSet[Partition]()
    private val leaderPartitionsLock = new Object
    val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
    private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
 -  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
 +  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
    private var hwThreadInitialized = false
+   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
+   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
  
    newGauge(
      "LeaderCount",
@@@ -174,8 -180,8 +182,8 @@@
          partition.leaderReplicaIfLocal match {
            case Some(leaderReplica) => leaderReplica
            case None =>
-             throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d"
-                                                   .format(topic, partitionId, config.brokerId))
+             throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d"
 -                    .format(topic, partitionId, config.brokerId))
++                                                     .format(topic, partitionId, config.brokerId))
          }
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/tools/DumpLogSegments.scala
index d9546ca,06e6437..5231e7c
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@@ -119,32 -123,34 +123,35 @@@ object DumpLogSegments 
      val messageSet = new FileMessageSet(file)
      var validBytes = 0L
      var lastOffset = -1l
-     for(messageAndOffset <- messageSet) {
-       val msg = messageAndOffset.message
+     for(shallowMessageAndOffset <- messageSet) { // this only does shallow iteration
+       val itr = getIterator(shallowMessageAndOffset, isDeepIteration)
+       for (messageAndOffset <- itr) {
+         val msg = messageAndOffset.message
  
-       if(lastOffset == -1)
+         if(lastOffset == -1)
+           lastOffset = messageAndOffset.offset
+         // If we are iterating uncompressed messages, offsets must be consecutive
+         else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
+           var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]())
+           nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
+           nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
+         }
          lastOffset = messageAndOffset.offset
-       // If it's uncompressed message, its offset must be lastOffset + 1 no matter last message is compressed or uncompressed
-       else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
-         var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]())
-         nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
-         nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
-       }
-       lastOffset = messageAndOffset.offset
  
-       print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
-             " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
-             " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
-       validBytes += MessageSet.entrySize(msg)
-       if(msg.hasKey)
-         print(" keysize: " + msg.keySize)
-       if(printContents) {
+         print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
+               " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
+               " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
          if(msg.hasKey)
-           print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8"))
-         val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8")
-         print(" payload: " + payload)
+           print(" keysize: " + msg.keySize)
+         if(printContents) {
+           if(msg.hasKey)
 -            print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
 -          print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
++            print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8"))
++          val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8")
++          print(" payload: " + payload)
+         }
+         println()
        }
-       println()
+       validBytes += MessageSet.entrySize(shallowMessageAndOffset.message)
      }
      val trailingBytes = messageSet.sizeInBytes - validBytes
      if(trailingBytes > 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index aa5e661,3cfa384..7629329
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@@ -196,7 -214,8 +214,8 @@@ object SimpleConsumerShell extends Logg
                    System.out.println("next offset = " + offset)
                  val message = messageAndOffset.message
                  val key = if(message.hasKey) Utils.readBytes(message.key) else null
 -                formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
 +                formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
+                 numMessagesConsumed += 1
                } catch {
                  case e =>
                    if (skipMessageOnError)

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/utils/Utils.scala
index c8fdf4a,c639efb..8692abc
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@@ -22,8 -22,10 +22,9 @@@ import java.nio.
  import charset.Charset
  import java.nio.channels._
  import java.lang.management._
 -import java.util.zip.CRC32
  import javax.management._
  import scala.collection._
+ import mutable.ListBuffer
  import scala.collection.mutable
  import java.util.Properties
  import kafka.common.KafkaException

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 5a489f9,0000000..15e9b60
mode 100644,000000..100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@@ -1,117 -1,0 +1,117 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.log
 +
 +import java.io.File
 +import scala.collection._
 +import org.junit._
 +import kafka.common.TopicAndPartition
 +import kafka.utils._
 +import kafka.message._
 +import org.scalatest.junit.JUnitSuite
 +import junit.framework.Assert._
 +
 +/**
 + * This is an integration test that tests the fully integrated log cleaner
 + */
 +class LogCleanerIntegrationTest extends JUnitSuite {
 +  
 +  val time = new MockTime()
 +  val segmentSize = 100
 +  val deleteDelay = 1000
 +  val logName = "log"
 +  val logDir = TestUtils.tempDir()
 +  var counter = 0
 +  val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
 +  
 +  @Test
 +  def cleanerTest() {
 +    val cleaner = makeCleaner(parts = 3)
 +    val log = cleaner.logs.get(topics(0))
 +
 +    val appends = writeDups(numKeys = 100, numDups = 3, log)
 +    val startSize = log.size
 +    cleaner.startup()
 +    
 +    val lastCleaned = log.activeSegment.baseOffset
 +    // wait until we clean up to base_offset of active segment - minDirtyMessages
 +    cleaner.awaitCleaned("log", 0, lastCleaned)
 +    
 +    val read = readFromLog(log)
 +    assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
 +    assertTrue(startSize > log.size)
 +    
 +    // write some more stuff and validate again
 +    val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log)
 +    val lastCleaned2 = log.activeSegment.baseOffset
 +    cleaner.awaitCleaned("log", 0, lastCleaned2)
 +    val read2 = readFromLog(log)
 +    assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
 +    
 +    cleaner.shutdown()
 +  }
 +  
 +  def readFromLog(log: Log): Iterable[(Int, Int)] = {
 +    for(segment <- log.logSegments; message <- segment.log) yield {
 +      val key = Utils.readString(message.message.key).toInt
 +      val value = Utils.readString(message.message.payload).toInt
 +      key -> value
 +    }
 +  }
 +  
 +  def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
 +    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
 +      val count = counter
-       val appendInfo = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
++      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
 +      counter += 1
 +      (key, count)
 +    }
 +  }
 +    
 +  @After
 +  def teardown() {
 +    Utils.rm(logDir)
 +  }
 +  
 +  /* create a cleaner instance and logs with the given parameters */
 +  def makeCleaner(parts: Int, 
 +                  minDirtyMessages: Int = 0, 
 +                  numThreads: Int = 1,
 +                  defaultPolicy: String = "dedupe",
 +                  policyOverrides: Map[String, String] = Map()): LogCleaner = {
 +    
 +    // create partitions and add them to the pool
 +    val logs = new Pool[TopicAndPartition, Log]()
 +    for(i <- 0 until parts) {
 +      val dir = new File(logDir, "log-" + i)
 +      dir.mkdirs()
 +      val log = new Log(dir = dir,
 +                        LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true),
 +                        needsRecovery = false,
 +                        scheduler = time.scheduler,
 +                        time = time)
 +      logs.put(TopicAndPartition("log", i), log)      
 +    }
 +  
 +    new LogCleaner(CleanerConfig(numThreads = numThreads),
 +                   logDirs = Array(logDir),
 +                   logs = logs,
 +                   time = time)
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogTest.scala
index 5658ed4,4ed88e8..7d41938
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@@ -212,10 -151,10 +212,10 @@@ class LogTest extends JUnitSuite 
    @Test
    def testLogRolls() {
      /* create a multipart log with 100 messages */
 -    val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 +    val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
      val numMessages = 100
      val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
-     val offsets = messageSets.map(log.append(_).firstOffset)
 -    val offsets = messageSets.map(log.append(_)._1)
++    messageSets.foreach(log.append(_))
      log.flush
  
      /* do successive reads to ensure all our messages are there */

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/731ba900/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------


Mime
View raw message