kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [4/5] KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini
Date Fri, 14 Mar 2014 22:14:50 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 7947b18..3cdf23d 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -77,16 +77,7 @@ class ProducerConfig private (val props: VerifiableProperties)
    * This parameter allows you to specify the compression codec for all data generated *
    * by this producer. The default is NoCompressionCodec
    */
-  val compressionCodec = {
-    val prop = props.getString("compression.codec", NoCompressionCodec.name)
-    try {
-      CompressionCodec.getCompressionCodec(prop.toInt)
-    }
-    catch {
-      case nfe: NumberFormatException =>
-        CompressionCodec.getCompressionCodec(prop)
-    }
-  }
+  val compressionCodec = props.getCompressionCodec("compression.codec", NoCompressionCodec)
 
   /** This parameter allows you to set whether compression should be turned *
    *  on for particular topics

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 215ac36..1d9922b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,24 +22,23 @@ import kafka.api._
 import kafka.message._
 import kafka.network._
 import kafka.log._
-import kafka.utils.ZKGroupTopicDirs
 import scala.collection._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
-import org.I0Itec.zkclient.ZkClient
 import kafka.common._
-import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
+import kafka.utils.{Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
 import kafka.cluster.Broker
 import kafka.controller.KafkaController
-
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * Logic to handle the various Kafka requests
  */
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
+                val offsetManager: OffsetManager,
                 val zkClient: ZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
@@ -65,7 +64,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     try{
       trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
       request.requestId match {
-        case RequestKeys.ProduceKey => handleProducerRequest(request)
+        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
         case RequestKeys.OffsetsKey => handleOffsetRequest(request)
         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
@@ -73,8 +72,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
-        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
+        case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
+        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -97,7 +97,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     try {
-      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
@@ -192,14 +192,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = {
+    val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map {
+      case (topicAndPartition, offset) =>
+        new Message(
+          bytes = OffsetManager.offsetCommitValue(offset),
+          key = OffsetManager.offsetCommitKey(offsetCommitRequest.groupId, topicAndPartition.topic, topicAndPartition.partition)
+        )
+    }.toSeq
+
+    val producerData = mutable.Map(
+      TopicAndPartition(OffsetManager.OffsetsTopicName, offsetManager.partitionFor(offsetCommitRequest.groupId)) ->
+        new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, msgs:_*)
+    )
+
+    val request = ProducerRequest(
+      correlationId = offsetCommitRequest.correlationId,
+      clientId = offsetCommitRequest.clientId,
+      requiredAcks = config.offsetCommitRequiredAcks,
+      ackTimeoutMs = config.offsetCommitTimeoutMs,
+      data = producerData)
+    trace("Created producer request %s for offset commit request %s.".format(request, offsetCommitRequest))
+    request
+  }
+
   /**
-   * Handle a produce request
+   * Handle a produce request or offset commit request (which is really a specialized producer request)
    */
-  def handleProducerRequest(request: RequestChannel.Request) {
-    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
+  def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
+
+    val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) {
+      val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+      (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
+    }
+    else {
+      (request.requestObj.asInstanceOf[ProducerRequest], None)
+    }
+
     val sTime = SystemTime.milliseconds
     val localProduceResults = appendToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+    val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
     produceRequest.data.foreach(partitionAndData =>
@@ -218,14 +251,29 @@ class KafkaApis(val requestChannel: RequestChannel,
           .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
         requestChannel.closeConnection(request.processor, request)
       } else {
-        requestChannel.noOperation(request.processor, request)
+
+        if (firstErrorCode == ErrorMapping.NoError)
+          offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
+
+        if (offsetCommitRequestOpt.isDefined) {
+          val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
+          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+        } else
+          requestChannel.noOperation(request.processor, request)
       }
     } else if (produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
         allPartitionHaveReplicationFactorOne ||
         numPartitionsInError == produceRequest.numPartitions) {
+
+      if (firstErrorCode == ErrorMapping.NoError) {
+        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+      }
+
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
-      val response = ProducerResponse(produceRequest.correlationId, statuses)
+      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
+                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
+
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
@@ -233,12 +281,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         topicAndPartition => new RequestKey(topicAndPartition)).toSeq
       val statuses = localProduceResults.map(r =>
         r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
-      val delayedProduce = new DelayedProduce(producerRequestKeys, 
-                                              request,
-                                              statuses,
-                                              produceRequest, 
-                                              produceRequest.ackTimeoutMs.toLong)
-      producerRequestPurgatory.watch(delayedProduce)
+      val delayedRequest =  new DelayedProduce(
+        producerRequestKeys,
+        request,
+        statuses,
+        produceRequest,
+        produceRequest.ackTimeoutMs.toLong,
+        offsetCommitRequestOpt)
+
+      producerRequestPurgatory.watch(delayedRequest)
 
       /*
        * Replica fetch requests may have arrived (and potentially satisfied)
@@ -252,6 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug(satisfiedProduceRequests.size +
         " producer requests unblocked during produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
+
       // we do not need the data anymore
       produceRequest.emptyData()
     }
@@ -265,7 +317,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
         acksPending, status.error, status.offset, requiredOffset)
   }
-
+  
   case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
     def this(key: TopicAndPartition, throwable: Throwable) = 
       this(key, -1L, -1L, Some(throwable))
@@ -557,120 +609,85 @@ class KafkaApis(val requestChannel: RequestChannel,
     ret.toSeq.sortBy(- _)
   }
 
-  /**
-   * Service the topic metadata request API
-   */
-  def handleTopicMetadataRequest(request: RequestChannel.Request) {
-    val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-    val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
+  private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
     val config = replicaManager.config
-    var uniqueTopics = Set.empty[String]
-    uniqueTopics = {
-      if(metadataRequest.topics.size > 0)
-        metadataRequest.topics.toSet
-      else {
-        partitionMetadataLock synchronized {
-          metadataCache.keySet.map(_.topic)
-        }
-      }
-    }
-    val topicMetadataList =
-      partitionMetadataLock synchronized {
-        uniqueTopics.map { topic =>
-          if(metadataCache.keySet.map(_.topic).contains(topic)) {
-            debug("Topic %s exists in metadata cache on broker %d".format(topic, config.brokerId))
-            val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
-            val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
-            val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
-              val replicas = metadataCache(topicAndPartition).allReplicas
-              var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
-              var leaderInfo: Option[Broker] = None
-              var isrInfo: Seq[Broker] = Nil
-              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-              debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
-              try {
-                if(aliveBrokers.keySet.contains(leader))
-                  leaderInfo = Some(aliveBrokers(leader))
-                else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition))
-                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
-                if(replicaInfo.size < replicas.size)
-                  throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
-                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-                if(isrInfo.size < isr.size)
-                  throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
-                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-              } catch {
-                case e: Throwable =>
-                  error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
-                  new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
-                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-              }
+
+    partitionMetadataLock synchronized {
+      topics.map { topic =>
+        if(metadataCache.keySet.map(_.topic).contains(topic)) {
+          val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
+          val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
+          val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
+            val replicas = metadataCache(topicAndPartition).allReplicas
+            val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
+            var leaderInfo: Option[Broker] = None
+            var isrInfo: Seq[Broker] = Nil
+            val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+            val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+            val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+            debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+            try {
+              if(aliveBrokers.keySet.contains(leader))
+                leaderInfo = Some(aliveBrokers(leader))
+              else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition))
+              isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+              if(replicaInfo.size < replicas.size)
+                throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+                  replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+              if(isrInfo.size < isr.size)
+                throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+                  isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+              new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+            } catch {
+              case e: Throwable =>
+                error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
+                new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
+                  ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }
-            new TopicMetadata(topic, partitionMetadata)
-          } else {
-            debug("Topic %s does not exist in metadata cache on broker %d".format(topic, config.brokerId))
-            // topic doesn't exist, send appropriate error code
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
           }
-        }
-      }
-
-    // handle auto create topics
-    topicMetadataList.foreach { topicMetadata =>
-      topicMetadata.errorCode match {
-        case ErrorMapping.NoError => topicsMetadata += topicMetadata
-        case ErrorMapping.UnknownTopicOrPartitionCode =>
-          if (config.autoCreateTopicsEnable) {
+          new TopicMetadata(topic, partitionMetadata)
+        } else {
+          // topic doesn't exist, send appropriate error code after handling auto create topics
+          val isOffsetsTopic = topic == OffsetManager.OffsetsTopicName
+          if (config.autoCreateTopicsEnable || isOffsetsTopic) {
             try {
-              AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+              if (isOffsetsTopic)
+                AdminUtils.createTopic(zkClient, topic,
+                  config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig)
+              else
+                AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                .format(topic, config.numPartitions, config.defaultReplicationFactor))
             } catch {
               case e: TopicExistsException => // let it go, possibly another broker created this topic
             }
-            topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
+            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
           } else {
-            debug("Auto create topic skipped for %s".format(topicMetadata.topic))
-            topicsMetadata += topicMetadata
+            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
           }
-        case _ =>
-          debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
-            ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
-          topicsMetadata += topicMetadata
+        }
       }
-    }
-    trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
-    val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    }.toSeq
   }
 
-  /* 
-   * Service the Offset commit API
+  /**
+   * Service the topic metadata request API
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request) {
-    val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
-    val responseInfo = offsetCommitRequest.requestInfo.map{
-      case (topicAndPartition, metaAndError) => {
-        val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
-        try {
-          ensureTopicExists(topicAndPartition.topic)
-          if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
-            (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
-          } else {
-            ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
-              topicAndPartition.partition, metaAndError.offset.toString)
-            (topicAndPartition, ErrorMapping.NoError)
-          }
-        } catch {
-          case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+  def handleTopicMetadataRequest(request: RequestChannel.Request) {
+    val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
+    var uniqueTopics = Set.empty[String]
+    uniqueTopics = {
+      if(metadataRequest.topics.size > 0)
+        metadataRequest.topics.toSet
+      else {
+        partitionMetadataLock synchronized {
+          metadataCache.keySet.map(_.topic)
         }
       }
     }
-    val response = new OffsetCommitResponse(responseInfo, 
-                                            offsetCommitRequest.correlationId)
+    val topicMetadata = getTopicMetadata(uniqueTopics)
+    trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
+    val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -679,26 +696,38 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
-    val responseInfo = offsetFetchRequest.requestInfo.map( t => {
-      val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
-      try {
-        ensureTopicExists(t.topic)
-        val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
-        payloadOpt match {
-          case Some(payload) => {
-            (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
-          } 
-          case None => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
-                          ErrorMapping.UnknownTopicOrPartitionCode))
-        }
-      } catch {
-        case e: Throwable =>
-          (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
-             ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
-      }
-    })
-    val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
-                                           offsetFetchRequest.correlationId)
+
+    val status = offsetManager.getOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap
+
+    val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
+
+    trace("Sending offset fetch response %s for correlation id %d to client %s."
+          .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
+
+  /*
+   * Service the consumer metadata API
+   */
+  def handleConsumerMetadataRequest(request: RequestChannel.Request) {
+    val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
+
+    val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
+
+    // get metadata (and create the topic if necessary)
+    val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
+
+    val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
+
+    val response =
+      offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
+        partitionMetadata.leader.map { leader =>
+          ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId)
+        }.getOrElse(errorResponse)
+      }.getOrElse(errorResponse)
+
+    trace("Sending consumer metadata %s for correlation id %d to client %s."
+          .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId))
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -772,12 +801,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   class DelayedProduce(keys: Seq[RequestKey],
                        request: RequestChannel.Request,
-                       val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
-                       val produce: ProducerRequest,
-                       delayMs: Long)
+                       val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus],
+                       produce: ProducerRequest,
+                       delayMs: Long,
+                       offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
           extends DelayedRequest(keys, request, delayMs) with Logging {
 
-    // first update the acks pending variable according to error code
+    // first update the acks pending variable according to the error code
     partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
       if (delayedStatus.status.error == ErrorMapping.NoError) {
         // Timeout error state will be cleared when requiredAcks are received
@@ -790,13 +820,21 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
     }
 
-
     def respond() {
       val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
         topicAndPartition -> delayedStatus.status
       }
 
-      val response = ProducerResponse(produce.correlationId, responseStatus)
+      val errorCode = responseStatus.find { case (_, status) =>
+        status.error != ErrorMapping.NoError
+      }.map(_._2.error).getOrElse(ErrorMapping.NoError)
+
+      if (errorCode == ErrorMapping.NoError) {
+        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+      }
+
+      val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize))
+                                           .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -828,7 +866,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             (false, ErrorMapping.UnknownTopicOrPartitionCode)
         }
         if (errorCode != ErrorMapping.NoError) {
-          fetchPartitionStatus.acksPending = false
+          fetchPartitionStatus. acksPending = false
           fetchPartitionStatus.status.error = errorCode
         } else if (hasEnough) {
           fetchPartitionStatus.acksPending = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b871843..d07796e 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -58,7 +58,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
   
   /* the number of threads to use for various background processing tasks */
-  val backgroundThreads = props.getIntInRange("background.threads", 4, (1, Int.MaxValue))
+  val backgroundThreads = props.getIntInRange("background.threads", 10, (1, Int.MaxValue))
   
   /* the number of queued requests allowed before blocking the network threads */
   val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
@@ -242,10 +242,46 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000)
 
   /* enable controlled shutdown of the server */
-  val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", false)
+  val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", default = false)
 
-  /*********** Misc configuration ***********/
+  /*********** Offset management configuration ***********/
   
   /* the maximum size for a metadata entry associated with an offset commit */
-  val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024)
+  val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", OffsetManagerConfig.DefaultMaxMetadataSize)
+
+  /** Batch size for reading from the offsets segments when loading offsets into the cache. */
+  val offsetsLoadBufferSize = props.getIntInRange("offsets.load.buffer.size",
+    OffsetManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE))
+
+  /** The replication factor for the offset commit topic (set higher to ensure availability). */
+  val offsetsTopicReplicationFactor: Short = props.getShortInRange("offsets.topic.replication.factor",
+    OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor, (1, Short.MaxValue))
+
+  /** The number of partitions for the offset commit topic (should not change after deployment). */
+  val offsetsTopicPartitions: Int = props.getIntInRange("offsets.topic.num.partitions",
+    OffsetManagerConfig.DefaultOffsetsTopicNumPartitions, (1, Integer.MAX_VALUE))
+
+  /** The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads */
+  val offsetsTopicSegmentBytes: Int = props.getIntInRange("offsets.topic.segment.bytes",
+    OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes, (1, Integer.MAX_VALUE))
+
+  /** Compression codec for the offsets topic - compression may be used to achieve "atomic" commits. */
+  val offsetsTopicCompressionCodec = props.getCompressionCodec("offsets.topic.compression.codec",
+    OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec)
+
+  /** Offsets older than this retention period will be discarded. */
+  val offsetsRetentionMinutes: Int = props.getIntInRange("offsets.retention.minutes", 24*60, (1, Integer.MAX_VALUE))
+
+  /** Frequency at which to check for stale offsets. */
+  val offsetsRetentionCheckIntervalMs: Long = props.getLongInRange("offsets.retention.check.interval.ms",
+    OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs, (1, Long.MaxValue))
+
+  /* Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is
+   * reached. This is similar to the producer request timeout. */
+   val offsetCommitTimeoutMs = props.getIntInRange("offsets.commit.timeout.ms",
+    OffsetManagerConfig.DefaultOffsetCommitTimeoutMs, (1, Integer.MAX_VALUE))
+
+  /** The required acks before the commit can be accepted. In general, the default (-1) should not be overridden. */
+  val offsetCommitRequiredAcks = props.getShortInRange("offsets.commit.required.acks",
+    OffsetManagerConfig.DefaultOffsetCommitRequiredAcks, (-1, offsetsTopicReplicationFactor))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index feb2093..c208f83 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -40,11 +40,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   this.logIdent = "[Kafka Server " + config.brokerId + "], "
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
-  private var startupComplete = new AtomicBoolean(false);
+  private var startupComplete = new AtomicBoolean(false)
   val correlationId: AtomicInteger = new AtomicInteger(0)
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
   var logManager: LogManager = null
+  var offsetManager: OffsetManager = null
   var kafkaHealthcheck: KafkaHealthcheck = null
   var topicConfigManager: TopicConfigManager = null
   var replicaManager: ReplicaManager = null
@@ -83,10 +84,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     socketServer.startup()
 
     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+
+    /* start offset manager */
+    offsetManager = createOffsetManager()
+
     kafkaController = new KafkaController(config, zkClient)
     
     /* start processing requests */
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
    
     Mx4jLoader.maybeLoad()
@@ -104,7 +109,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
     
     registerStats()
-    startupComplete.set(true);
+    startupComplete.set(true)
     info("started")
   }
   
@@ -215,7 +220,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    */
   def shutdown() {
     info("shutting down")
-    val canShutdown = isShuttingDown.compareAndSet(false, true);
+    val canShutdown = isShuttingDown.compareAndSet(false, true)
     if (canShutdown) {
       Utils.swallow(controlledShutdown())
       if(kafkaHealthcheck != null)
@@ -224,6 +229,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         Utils.swallow(socketServer.shutdown())
       if(requestHandlerPool != null)
         Utils.swallow(requestHandlerPool.shutdown())
+      if(offsetManager != null)
+        offsetManager.shutdown()
       Utils.swallow(kafkaScheduler.shutdown())
       if(apis != null)
         Utils.swallow(apis.close())
@@ -237,7 +244,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         Utils.swallow(zkClient.close())
 
       shutdownLatch.countDown()
-      startupComplete.set(false);
+      startupComplete.set(false)
       info("shut down completed")
     }
   }
@@ -285,6 +292,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                    time = time)
   }
 
-}
+  private def createOffsetManager(): OffsetManager = {
+    val offsetManagerConfig = OffsetManagerConfig(
+      maxMetadataSize = config.offsetMetadataMaxSize,
+      loadBufferSize = config.offsetsLoadBufferSize,
+      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
+  }
 
+}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
new file mode 100644
index 0000000..89a88a7
--- /dev/null
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils._
+import kafka.common._
+import java.nio.ByteBuffer
+import java.util.Properties
+import kafka.log.{FileMessageSet, LogConfig}
+import org.I0Itec.zkclient.ZkClient
+import scala.collection._
+import kafka.message._
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import scala.Some
+import kafka.common.TopicAndPartition
+import kafka.consumer.MessageFormatter
+import java.io.PrintStream
+import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
+import org.apache.kafka.common.protocol.types.Type.STRING
+import org.apache.kafka.common.protocol.types.Type.INT32
+import org.apache.kafka.common.protocol.types.Type.INT64
+import java.util.concurrent.atomic.AtomicBoolean
+
+
+/**
+ * Configuration settings for in-built offset management
+ * @param maxMetadataSize The maximum allowed metadata for any offset commit.
+ * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
+ * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
+ * @param offsetsRetentionCheckIntervalMs Frequency at which to check for stale offsets.
+ * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
+ * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
+ *                                 log compaction and faster offset loads
+ * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
+ * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
+ *                                     order to achieve "atomic" commits.
+ * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
+ *                              commit or this timeout is reached. (Similar to the producer request timeout.)
+ * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
+ *                                 should not be overridden.
+ */
+case class OffsetManagerConfig(maxMetadataSize: Int = OffsetManagerConfig.DefaultMaxMetadataSize,
+                               loadBufferSize: Int = OffsetManagerConfig.DefaultLoadBufferSize,
+                               offsetsRetentionMs: Long = 24*60*60000L,
+                               offsetsRetentionCheckIntervalMs: Long = OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs,
+                               offsetsTopicNumPartitions: Int = OffsetManagerConfig.DefaultOffsetsTopicNumPartitions,
+                               offsetsTopicSegmentBytes: Int = OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes,
+                               offsetsTopicReplicationFactor: Short = OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor,
+                               offsetsTopicCompressionCodec: CompressionCodec = OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec,
+                               offsetCommitTimeoutMs: Int = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs,
+                               offsetCommitRequiredAcks: Short = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks)
+
+object OffsetManagerConfig {
+  val DefaultMaxMetadataSize = 4096
+  val DefaultLoadBufferSize = 5*1024*1024
+  val DefaultOffsetsRetentionCheckIntervalMs = 600000L
+  val DefaultOffsetsTopicNumPartitions = 1
+  val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
+  val DefaultOffsetsTopicReplicationFactor = 1.toShort
+  val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
+  val DefaultOffsetCommitTimeoutMs = 5000
+  val DefaultOffsetCommitRequiredAcks = (-1).toShort
+}
+
+class OffsetManager(val config: OffsetManagerConfig,
+                    replicaManager: ReplicaManager,
+                    zkClient: ZkClient,
+                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+
+  /* offsets and metadata cache */
+  private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
+  private val followerTransitionLock = new Object
+
+  private val loadingPartitions: mutable.Set[Int] = mutable.Set()
+
+  private val shuttingDown = new AtomicBoolean(false)
+
+  scheduler.schedule(name = "offsets-cache-compactor",
+                     fun = compact,
+                     period = config.offsetsRetentionCheckIntervalMs,
+                     unit = TimeUnit.MILLISECONDS)
+
+  newGauge("NumOffsets",
+    new Gauge[Int] {
+      def value = offsetsCache.size
+    }
+  )
+
+  newGauge("NumGroups",
+    new Gauge[Int] {
+      def value = offsetsCache.keys.map(_.group).toSet.size
+    }
+  )
+
+  private def compact() {
+    debug("Compacting offsets cache.")
+    val startMs = SystemTime.milliseconds
+
+    val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)
+
+    debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))
+
+    // delete the stale offsets from the table and generate tombstone messages to remove them from the log
+    val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>
+      val offsetsPartition = partitionFor(groupTopicAndPartition.group)
+      trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
+
+      offsetsCache.remove(groupTopicAndPartition)
+
+      val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
+        groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
+
+      (offsetsPartition, new Message(bytes = null, key = commitKey))
+    }.groupBy{ case (partition, tombstone) => partition }
+
+    // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
+    // if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles.
+    val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) =>
+      val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+      partitionOpt.map { partition =>
+        val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+        val messages = tombstones.map(_._2).toSeq
+
+        trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
+
+        try {
+          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+          tombstones.size
+        }
+        catch {
+          case t: Throwable =>
+            error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t)
+            // ignore and continue
+            0
+        }
+      }
+    }.sum
+
+    debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))
+  }
+
+  def offsetsTopicConfig: Properties = {
+    val props = new Properties
+    props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
+    props.put(LogConfig.CleanupPolicyProp, "dedupe")
+    props
+  }
+
+  def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions
+
+  /**
+   * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
+   *
+   * @param key The requested group-topic-partition
+   * @return If the key is present, return the offset and metadata; otherwise return None
+   */
+  private def getOffset(key: GroupTopicPartition) = {
+    val offsetAndMetadata = offsetsCache.get(key)
+    if (offsetAndMetadata == null)
+      OffsetMetadataAndError.NoOffset
+    else
+      OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
+  }
+
+  /**
+   * Put the (already committed) offset for the given group/topic/partition into the cache.
+   *
+   * @param key The group-topic-partition
+   * @param offsetAndMetadata The offset/metadata to be stored
+   */
+  private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
+    offsetsCache.put(key, offsetAndMetadata)
+  }
+
+  def putOffsets(group: String, offsets: Map[TopicAndPartition, OffsetAndMetadata]) {
+    // this method is called _after_ the offsets have been durably appended to the commit log, so there is no need to
+    // check for current leadership as we do for the offset fetch
+    trace("Putting offsets %s for group %s in offsets partition %d.".format(offsets, group, partitionFor(group)))
+    offsets.foreach { case (topicAndPartition, offsetAndMetadata) =>
+      putOffset(GroupTopicPartition(group, topicAndPartition), offsetAndMetadata)
+    }
+  }
+
+  /**
+   * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
+   * returns the current offset or it begins to sync the cache from the log (and returns an error code).
+   */
+  def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+    trace("Getting offsets %s for group %s.".format(topicPartitions, group))
+
+    val offsetsPartition = partitionFor(group)
+
+    /**
+     * followerTransitionLock protects against fetching from an empty/cleared offset cache (i.e., cleared due to a
+     * leader->follower transition). i.e., even if leader-is-local is true a follower transition can occur right after
+     * the check and clear the cache. i.e., we would read from the empty cache and incorrectly return NoOffset.
+     */
+    followerTransitionLock synchronized {
+      if (leaderIsLocal(offsetsPartition)) {
+        if (loadingPartitions synchronized loadingPartitions.contains(offsetsPartition)) {
+          debug("Cannot fetch offsets for group %s due to ongoing offset load.".format(group))
+          topicPartitions.map { topicAndPartition =>
+            val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+            (groupTopicPartition.topicPartition, OffsetMetadataAndError.OffsetsLoading)
+          }.toMap
+        } else {
+          if (topicPartitions.size == 0) {
+           // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
+            offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
+              (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
+            }.toMap
+          } else {
+            topicPartitions.map { topicAndPartition =>
+              val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+              (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
+            }.toMap
+          }
+        }
+      } else {
+        debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
+        topicPartitions.map { topicAndPartition =>
+          val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+          (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotOffsetManagerForGroup)
+        }.toMap
+      }
+    }
+  }
+
+  /**
+   * Asynchronously read the partition from the offsets topic and populate the cache
+   */
+  def loadOffsetsFromLog(offsetsPartition: Int) {
+    
+    val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+
+    loadingPartitions synchronized {
+      if (loadingPartitions.contains(offsetsPartition)) {
+        info("Offset load from %s already in progress.".format(topicPartition))
+      } else {
+        loadingPartitions.add(offsetsPartition)
+        scheduler.schedule(topicPartition.toString, loadOffsets)
+      }
+    }
+
+    def loadOffsets() {
+      info("Loading offsets from " + topicPartition)
+
+      val startMs = SystemTime.milliseconds
+      try {
+        replicaManager.logManager.getLog(topicPartition) match {
+          case Some(log) =>
+            var currOffset = log.logSegments.head.baseOffset
+            val buffer = ByteBuffer.allocate(config.loadBufferSize)
+            // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
+            while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
+              buffer.clear()
+              val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet]
+              messages.readInto(buffer, 0)
+              val messageSet = new ByteBufferMessageSet(buffer)
+              messageSet.foreach { msgAndOffset =>
+                require(msgAndOffset.message.key != null, "Offset entry key should not be null")
+                val key = OffsetManager.readMessageKey(msgAndOffset.message.key)
+                if (msgAndOffset.message.payload == null) {
+                  if (offsetsCache.remove(key) != null)
+                    trace("Removed offset for %s due to tombstone entry.".format(key))
+                  else
+                    trace("Ignoring redundant tombstone for %s.".format(key))
+                } else {
+                  val value = OffsetManager.readMessageValue(msgAndOffset.message.payload)
+                  putOffset(key, value)
+                  trace("Loaded offset %s for %s.".format(value, key))
+                }
+                currOffset = msgAndOffset.nextOffset
+              }
+            }
+
+            if (!shuttingDown.get())
+              info("Finished loading offsets from %s in %d milliseconds."
+                   .format(topicPartition, SystemTime.milliseconds - startMs))
+          case None =>
+            warn("No log found for " + topicPartition)
+        }
+      }
+      catch {
+        case t: Throwable =>
+          error("Error in loading offsets from " + topicPartition, t)
+      }
+      finally {
+        loadingPartitions synchronized loadingPartitions.remove(offsetsPartition)
+      }
+    }
+  }
+
+  private def getHighWatermark(partitionId: Int): Long = {
+    val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId)
+
+    val hw = partitionOpt.map { partition =>
+      partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L)
+    }.getOrElse(-1L)
+
+    hw
+  }
+
+  private def leaderIsLocal(partition: Int) = { getHighWatermark(partition) != -1L }
+
+  /**
+   * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
+   * that partition.
+   * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+   */
+  def clearOffsetsInPartition(offsetsPartition: Int) {
+    debug("Deleting offset entries belonging to [%s,%d].".format(OffsetManager.OffsetsTopicName, offsetsPartition))
+
+    followerTransitionLock synchronized {
+      offsetsCache.keys.foreach { key =>
+        if (partitionFor(key.group) == offsetsPartition) {
+          offsetsCache.remove(key)
+        }
+      }
+    }
+  }
+
+  def shutdown() {
+    shuttingDown.set(true)
+  }
+
+}
+
+object OffsetManager {
+
+  val OffsetsTopicName = "__consumer_offsets"
+
+  private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
+
+  private val CURRENT_OFFSET_SCHEMA_VERSION = 0.toShort
+
+  private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
+                                                       new Field("topic", STRING),
+                                                       new Field("partition", INT32))
+  private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
+  private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
+  private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
+
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
+                                                         new Field("metadata", STRING, "Associated metadata.", ""),
+                                                         new Field("timestamp", INT64))
+  private val VALUE_OFFSET_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
+  private val VALUE_METADATA_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
+  private val VALUE_TIMESTAMP_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
+
+  // map of versions to schemas
+  private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0))
+
+  private val CURRENT_SCHEMA = schemaFor(CURRENT_OFFSET_SCHEMA_VERSION)
+
+  private def schemaFor(version: Int) = {
+    val schemaOpt = OFFSET_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown offset schema version " + version)
+    }
+  }
+
+  /**
+   * Generates the key for offset commit message for given (group, topic, partition)
+   *
+   * @return key for offset commit message
+   */
+  def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
+    val key = new Struct(CURRENT_SCHEMA.keySchema)
+    key.set(KEY_GROUP_FIELD, group)
+    key.set(KEY_TOPIC_FIELD, topic)
+    key.set(KEY_PARTITION_FIELD, partition)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION)
+    key.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the payload for offset commit message from given offset and metadata
+   *
+   * @param offsetAndMetadata consumer's current offset and metadata
+   * @return payload for offset commit message
+   */
+  def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+    val value = new Struct(CURRENT_SCHEMA.valueSchema)
+    value.set(VALUE_OFFSET_FIELD, offsetAndMetadata.offset)
+    value.set(VALUE_METADATA_FIELD, offsetAndMetadata.metadata)
+    value.set(VALUE_TIMESTAMP_FIELD, offsetAndMetadata.timestamp)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION)
+    value.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Decodes the offset messages' key
+   *
+   * @param buffer input byte-buffer
+   * @return an GroupTopicPartition object
+   */
+  def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
+    val version = buffer.getShort()
+    val keySchema = schemaFor(version).keySchema
+    val key = keySchema.read(buffer).asInstanceOf[Struct]
+
+    val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
+    val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
+    val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
+
+    GroupTopicPartition(group, TopicAndPartition(topic, partition))
+  }
+
+  /**
+   * Decodes the offset messages' payload and retrieves offset and metadata from it
+   *
+   * @param buffer input byte-buffer
+   * @return an offset-metadata object from the message
+   */
+  def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
+    if(buffer == null) { // tombstone
+      null
+    } else {
+      val version = buffer.getShort()
+      val valueSchema = schemaFor(version).valueSchema
+      val value = valueSchema.read(buffer).asInstanceOf[Struct]
+
+      val offset = value.get(VALUE_OFFSET_FIELD).asInstanceOf[Long]
+      val metadata = value.get(VALUE_METADATA_FIELD).asInstanceOf[String]
+      val timestamp = value.get(VALUE_TIMESTAMP_FIELD).asInstanceOf[Long]
+
+      OffsetAndMetadata(offset, metadata, timestamp)
+    }
+  }
+
+  // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
+  // (specify --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
+  class OffsetsMessageFormatter extends MessageFormatter {
+    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+      val formattedKey = if (key == null) "NULL" else OffsetManager.readMessageKey(ByteBuffer.wrap(key)).toString
+      val formattedValue = if (value == null) "NULL" else OffsetManager.readMessageValue(ByteBuffer.wrap(value)).toString
+      output.write(formattedKey.getBytes)
+      output.write("::".getBytes)
+      output.write(formattedValue.getBytes)
+      output.write("\n".getBytes)
+    }
+  }
+
+}
+
+case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
+
+  def this(group: String, topic: String, partition: Int) =
+    this(group, new TopicAndPartition(topic, partition))
+
+  override def toString =
+    "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index fb759d9..f16fbe6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -205,7 +205,8 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
+                             offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
@@ -255,8 +256,10 @@ class ReplicaManager(val config: KafkaConfig,
           .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
         val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
 
-        if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
-        if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+        if (!partitionsTobeLeader.isEmpty)
+          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
+        if (!partitionsToBeFollower.isEmpty)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)
 
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding weird race conditions
@@ -283,7 +286,8 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeLeaders(controllerId: Int, epoch: Int,
                           partitionState: Map[Partition, PartitionStateInfo],
-                          correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = {
+                          correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
+                          offsetManager: OffsetManager) = {
     partitionState.foreach(state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-leader transition for partition %s")
@@ -302,7 +306,7 @@ class ReplicaManager(val config: KafkaConfig,
       }
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
-        partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
+        partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
 
       // Finally add these partitions to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
@@ -344,7 +348,8 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition caused it
    */
   private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
-                            leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
+                            leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
+                            offsetManager: OffsetManager) {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")
@@ -367,7 +372,7 @@ class ReplicaManager(val config: KafkaConfig,
         val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
         leaders.find(_.id == newLeaderBrokerId) match {
           case Some(leaderBroker) =>
-            if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+            if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
               partitionsToMakeFollower += partition
             else
               stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 33d7c2c..88f824f 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -20,16 +20,22 @@ package kafka.tools
 
 import joptsimple._
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Json, ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils._
 import kafka.consumer.SimpleConsumer
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
-import kafka.common.{BrokerNotAvailableException, TopicAndPartition}
+import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
+import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition}
 import scala.collection._
+import kafka.client.ClientUtils
+import kafka.network.BlockingChannel
+import kafka.api.PartitionOffsetRequestInfo
+import scala.Some
 
 
 object ConsumerOffsetChecker extends Logging {
 
   private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
+  private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
+  private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()
 
   private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
     try {
@@ -49,18 +55,17 @@ object ConsumerOffsetChecker extends Logging {
       }
     } catch {
       case t: Throwable =>
-        error("Could not parse broker info", t)
+        println("Could not parse broker info due to " + t.getCause)
         None
     }
   }
 
   private def processPartition(zkClient: ZkClient,
                                group: String, topic: String, pid: Int) {
-    val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
-            format(group, topic, pid))._1.toLong
-    val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
-            format(group, topic, pid))._1
-
+    val topicPartition = TopicAndPartition(topic, pid)
+    val offsetOpt = offsetMap.get(topicPartition)
+    val groupDirs = new ZKGroupTopicDirs(group, topic)
+    val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1
     ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
       case Some(bid) =>
         val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
@@ -71,19 +76,18 @@ object ConsumerOffsetChecker extends Logging {
               OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
             val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
 
-            val lag = logSize - offset
-            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag,
-              owner match {case Some(ownerStr) => ownerStr case None => "none"}))
+            val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
+            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
+                                                                   owner match {case Some(ownerStr) => ownerStr case None => "none"}))
           case None => // ignore
         }
       case None =>
-        error("No broker for partition %s - %s".format(topic, pid))
+        println("No broker for partition %s - %s".format(topic, pid))
     }
   }
 
   private def processTopic(zkClient: ZkClient, group: String, topic: String) {
-    val pidMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
-    pidMap.get(topic) match {
+    topicPidMap.get(topic) match {
       case Some(pids) =>
         pids.sorted.foreach {
           pid => processPartition(zkClient, group, topic, pid)
@@ -105,13 +109,18 @@ object ConsumerOffsetChecker extends Logging {
   def main(args: Array[String]) {
     val parser = new OptionParser()
 
-    val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.").
-            withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
+    val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string.").
+            withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])
     val topicsOpt = parser.accepts("topic",
             "Comma-separated list of consumer topics (all topics if absent).").
             withRequiredArg().ofType(classOf[String])
     val groupOpt = parser.accepts("group", "Consumer group.").
             withRequiredArg().ofType(classOf[String])
+    val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to use when querying for offsets.").
+            withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000)
+    val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to use for failed offset queries.").
+            withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000)
+
     parser.accepts("broker-info", "Print broker info")
     parser.accepts("help", "Print this message.")
 
@@ -122,7 +131,7 @@ object ConsumerOffsetChecker extends Logging {
        System.exit(0)
     }
 
-    for (opt <- List(groupOpt))
+    for (opt <- List(groupOpt, zkConnectOpt))
       if (!options.has(opt)) {
         System.err.println("Missing required argument: %s".format(opt))
         parser.printHelpOn(System.err)
@@ -130,23 +139,50 @@ object ConsumerOffsetChecker extends Logging {
       }
 
     val zkConnect = options.valueOf(zkConnectOpt)
+
     val group = options.valueOf(groupOpt)
-    val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt))
-      else None
+    val groupDirs = new ZKGroupDirs(group)
 
+    val channelSocketTimeoutMs = options.valueOf(channelSocketTimeoutMsOpt).intValue()
+    val channelRetryBackoffMs = options.valueOf(channelRetryBackoffMsOpt).intValue()
+
+    val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None
 
     var zkClient: ZkClient = null
+    var channel: BlockingChannel = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
 
       val topicList = topics match {
         case Some(x) => x.split(",").view.toList
-        case None => ZkUtils.getChildren(
-          zkClient, "/consumers/%s/offsets".format(group)).toList
+        case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +  "/owners").toList
       }
 
-      debug("zkConnect = %s; topics = %s; group = %s".format(
-        zkConnect, topicList.toString(), group))
+      topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)
+      val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
+
+      val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+      
+      debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
+      channel.send(OffsetFetchRequest(group, topicPartitions))
+      val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
+      debug("Received offset fetch response %s.".format(offsetFetchResponse))
+      
+      offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
+        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
+          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
+          // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
+          // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
+          val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
+          offsetMap.put(topicAndPartition, offset)
+        }
+        else if (offsetAndMetadata.error == ErrorMapping.NoError)
+          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
+        else {
+          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
+        }
+      }
+      channel.disconnect()
 
       println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
       topicList.sorted.foreach {
@@ -154,7 +190,7 @@ object ConsumerOffsetChecker extends Logging {
       }
 
       if (options.has("broker-info"))
-        printBrokerInfo();
+        printBrokerInfo()
 
       for ((_, consumerOpt) <- consumerMap)
         consumerOpt match {
@@ -162,6 +198,10 @@ object ConsumerOffsetChecker extends Logging {
           case None => // ignore
         }
     }
+    catch {
+      case t: Throwable =>
+        println("Exiting due to: %s.".format(t.getMessage))
+    }
     finally {
       for (consumerOpt <- consumerMap.values) {
         consumerOpt match {
@@ -171,6 +211,9 @@ object ConsumerOffsetChecker extends Logging {
       }
       if (zkClient != null)
         zkClient.close()
+
+      if (channel != null)
+        channel.disconnect()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 14f44d9..f0ab02a 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -62,7 +62,7 @@ object DumpLogSegments {
       val file = new File(arg)
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration)
+        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
         dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
@@ -117,13 +117,15 @@ object DumpLogSegments {
   private def dumpLog(file: File,
                       printContents: Boolean,
                       nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]],
-                      isDeepIteration: Boolean) {
+                      isDeepIteration: Boolean,
+                      maxMessageSize: Int) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file, false)
     var validBytes = 0L
     var lastOffset = -1l
-    for(shallowMessageAndOffset <- messageSet) { // this only does shallow iteration
+    val shallowIterator = messageSet.iterator(maxMessageSize)
+    for(shallowMessageAndOffset <- shallowIterator) { // this only does shallow iteration
       val itr = getIterator(shallowMessageAndOffset, isDeepIteration)
       for (messageAndOffset <- itr) {
         val msg = messageAndOffset.message

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index f0f871c..e4d1a86 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -22,7 +22,6 @@ import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
-import java.nio.ByteBuffer
 import kafka.consumer._
 import kafka.serializer._
 import collection.mutable.ListBuffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 5e8c56d..a649461 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -118,7 +118,10 @@ object ReplicaVerificationTool extends Logging {
     val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
     val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata)
     val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
-        topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic)) true else false
+        topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
+          true
+        else
+          false
     )
     val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
       topicMetadataResponse =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index eac9af2..92c0d1f 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -26,7 +26,7 @@ object VerifyConsumerRebalance extends Logging {
     val parser = new OptionParser()
 
     val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string.").
-      withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
+      withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])
     val groupOpt = parser.accepts("group", "Consumer group.").
       withRequiredArg().ofType(classOf[String])
     parser.accepts("help", "Print this message.")
@@ -78,7 +78,7 @@ object VerifyConsumerRebalance extends Logging {
      * This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists
      * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
      */
-    val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group)
+    val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false)
     val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
 
     partitionsPerTopicMap.foreach { partitionsForTopic =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index b070bb4..d40b03c 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -20,6 +20,8 @@ package kafka.utils
 import java.util.Properties
 import java.util.Collections
 import scala.collection._
+import kafka.message.{CompressionCodec, NoCompressionCodec}
+
 
 class VerifiableProperties(val props: Properties) extends Logging {
   private val referenceSet = mutable.HashSet[String]()
@@ -193,6 +195,24 @@ class VerifiableProperties(val props: Properties) extends Logging {
     }
   }
 
+  /**
+   * Parse compression codec from a property list in either. Codecs may be specified as integers, or as strings.
+   * See [[kafka.message.CompressionCodec]] for more details.
+   * @param name The property name
+   * @param default Default compression codec
+   * @return compression codec
+   */
+  def getCompressionCodec(name: String, default: CompressionCodec) = {
+    val prop = getString(name, NoCompressionCodec.name)
+    try {
+      CompressionCodec.getCompressionCodec(prop.toInt)
+    }
+    catch {
+      case nfe: NumberFormatException =>
+        CompressionCodec.getCompressionCodec(prop)
+    }
+  }
+
   def verify() {
     info("Verifying properties")
     val propNames = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a198628..16bf7e3 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -676,12 +676,12 @@ object ZkUtils extends Logging {
     getChildren(zkClient, dirs.consumerRegistryDir)
   }
 
-  def getConsumersPerTopic(zkClient: ZkClient, group: String) : mutable.Map[String, List[String]] = {
+  def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[String]] = {
     val dirs = new ZKGroupDirs(group)
     val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
     val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
     for (consumer <- consumers) {
-      val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient)
+      val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics)
       for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
         for (consumerThreadId <- consumerThreadIdSet)
           consumersPerTopicMap.get(topic) match {


Mime
View raw message