kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1239937 - in /incubator/kafka/branches/0.8: ./ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/log/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/tools/ core/src/main/scala/k...
Date Thu, 02 Feb 2012 23:40:40 GMT
Author: nehanarkhede
Date: Thu Feb  2 23:40:39 2012
New Revision: 1239937

URL: http://svn.apache.org/viewvc?rev=1239937&view=rev
Log:
Merging commits 1230840:1239902 from trunk

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
      - copied unchanged from r1239902, incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
Modified:
    incubator/kafka/branches/0.8/   (props changed)
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties

Propchange: incubator/kafka/branches/0.8/
------------------------------------------------------------------------------
    svn:mergeinfo = /incubator/kafka/trunk:1230841-1239902

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala Thu Feb  2 23:40:39 2012
@@ -32,7 +32,7 @@ private[kafka] class Cluster {
       brokers.put(broker.id, broker)
   }
 
-  def getBroker(id: Int) = brokers.get(id).get
+  def getBroker(id: Int): Option[Broker] = brokers.get(id)
   
   def add(broker: Broker) = brokers.put(broker.id, broker)
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Thu Feb  2 23:40:39 2012
@@ -24,7 +24,7 @@ import kafka.common.InvalidConfigExcepti
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
-  val FetchSize = 300 * 1024
+  val FetchSize = 1024 * 1024
   val MaxFetchSize = 10*FetchSize
   val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
@@ -62,9 +62,6 @@ class ConsumerConfig(props: Properties) 
   /** the number of byes of messages to attempt to fetch */
   val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
   
-  /** the maximum allowable fetch size for a very large message */
-  val maxFetchSize: Int = fetchSize * 10
-  
   /** to avoid repeatedly polling a broker node which has no new data
       we will backoff every time we get an empty set from the broker*/
   val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Feb  2 23:40:39 2012
@@ -58,6 +58,8 @@ class ConsumerIterator[T](private val to
       else {
         currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
         if (currentDataChunk == null) {
+          // reset state to make the iterator re-iterable
+          resetState()
           throw new ConsumerTimeoutException
         }
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala Thu Feb  2 23:40:39 2012
@@ -22,6 +22,7 @@ import kafka.cluster._
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.BlockingQueue
 import kafka.utils._
+import java.lang.IllegalStateException
 
 /**
  * The fetcher is a background thread that fetches data from a set of servers
@@ -73,7 +74,13 @@ private [consumer] class Fetcher(val con
 
     // open a new fetcher thread for each broker
     val ids = Set() ++ topicInfos.map(_.brokerId)
-    val brokers = ids.map(cluster.getBroker(_))
+    val brokers = ids.map { id =>
+      cluster.getBroker(id) match {
+        case Some(broker) => broker
+        case None => throw new IllegalStateException("Broker " + id + " is unavailable, fetchers could not be started")
+      }
+    }
+
     fetcherThreads = new Array[FetcherRunnable](brokers.size)
     var i = 0
     for(broker <- brokers) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Thu Feb  2 23:40:39 2012
@@ -21,7 +21,7 @@ import scala.collection._
 import scala.util.parsing.json.JSON
 import kafka.utils.Logging
 
-private[consumer] object TopicCount extends Logging {
+private[kafka] object TopicCount extends Logging {
   val myConversionFunc = {input : String => input.toInt}
   JSON.globalNumberParser = myConversionFunc
 
@@ -44,7 +44,7 @@ private[consumer] object TopicCount exte
 
 }
 
-private[consumer] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
+private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
 
   def getConsumerThreadIdsPerTopic()
     : Map[String, Set[String]] = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Feb  2 23:40:39 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,6 +30,8 @@ import kafka.api.OffsetRequest
 import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
+import java.lang.IllegalStateException
+import kafka.utils.ZkUtils._
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -157,7 +159,7 @@ private[kafka] class ZookeeperConsumerCo
 
     var consumerUuid : String = null
     config.consumerId match {
-      case Some(consumerId) // for testing only 
+      case Some(consumerId) // for testing only
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
       => val uuid = UUID.randomUUID()
@@ -193,7 +195,7 @@ private[kafka] class ZookeeperConsumerCo
 
     ret.foreach { topicAndStreams =>
       // register on broker partition path changes
-      val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topicAndStreams._1
+      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
       zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
     }
 
@@ -204,7 +206,7 @@ private[kafka] class ZookeeperConsumerCo
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
@@ -239,7 +241,7 @@ private[kafka] class ZookeeperConsumerCo
       for (info <- infos.values) {
         val newOffset = info.getConsumeOffset
         try {
-          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
+          updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
             newOffset.toString)
         }
         catch {
@@ -289,7 +291,7 @@ private[kafka] class ZookeeperConsumerCo
     try {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       val znode = topicDirs.consumerOffsetDir + "/" + partition.name
-      val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+      val offsetString = readDataMaybeNull(zkClient, znode)
       if (offsetString != null)
         return offsetString.toLong
       else
@@ -309,8 +311,12 @@ private[kafka] class ZookeeperConsumerCo
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
-      val cluster = ZkUtils.getCluster(zkClient)
-      val broker = cluster.getBroker(brokerId)
+      val cluster = getCluster(zkClient)
+      val broker = cluster.getBroker(brokerId) match {
+        case Some(b) => b
+        case None => throw new IllegalStateException("Broker " + brokerId + " is unavailable. Cannot issue " +
+          "getOffsetsBefore request")
+      }
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
                                             ConsumerConfig.SocketBufferSize)
       val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
@@ -358,7 +364,7 @@ private[kafka] class ZookeeperConsumerCo
       loadBalancerListener.syncedRebalance
 
       // There is no need to resubscribe to child and state changes.
-      // The child change watchers will be set inside rebalance when we read the children list. 
+      // The child change watchers will be set inside rebalance when we read the children list.
     }
 
   }
@@ -376,34 +382,17 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def releasePartitionOwnership()= {
+      info("Releasing partition ownership")
       for ((topic, infos) <- topicRegistry) {
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         for(partition <- infos.keys) {
           val znode = topicDirs.consumerOwnerDir + "/" + partition
-          ZkUtils.deletePath(zkClient, znode)
+          deletePath(zkClient, znode)
           debug("Consumer " + consumerIdString + " releasing " + znode)
         }
       }
     }
 
-    private def getConsumersPerTopic(group: String) : mutable.Map[String, List[String]] = {
-      val consumers = ZkUtils.getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
-      val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
-      for (consumer <- consumers) {
-        val topicCount = getTopicCount(consumer)
-        for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
-          for (consumerThreadId <- consumerThreadIdSet)
-            consumersPerTopicMap.get(topic) match {
-              case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
-              case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
-            }
-        }
-      }
-      for ( (topic, consumerList) <- consumersPerTopicMap )
-        consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
-      consumersPerTopicMap
-    }
-
     private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
                                     newPartMap: Map[String,List[String]],
                                     oldPartMap: Map[String,List[String]],
@@ -416,11 +405,6 @@ private[kafka] class ZookeeperConsumerCo
       relevantTopicThreadIdsMap
     }
 
-    private def getTopicCount(consumerId: String) : TopicCount = {
-      val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
-      TopicCount.constructTopicCount(consumerId, topicCountJson)
-    }
-
     def resetState() {
       topicRegistry.clear
       oldConsumersPerTopicMap.clear
@@ -432,19 +416,34 @@ private[kafka] class ZookeeperConsumerCo
         for (i <- 0 until config.maxRebalanceRetries) {
           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
+          val cluster = getCluster(zkClient)
           try {
-            done = rebalance()
+            done = rebalance(cluster)
           }
           catch {
             case e =>
-              // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
-              // For example, a ZK node can disappear between the time we get all children and the time we try to get
-              // the value of a child. Just let this go since another rebalance will be triggered.
+              /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+               * For example, a ZK node can disappear between the time we get all children and the time we try to get
+               * the value of a child. Just let this go since another rebalance will be triggered.
+               **/
               info("exception during rebalance ", e)
+              /* Explicitly make sure another rebalancing attempt will get triggered. */
+              done = false
           }
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
-          if (done)
+          if (done) {
             return
+          }else {
+              /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
+               * clear the cache */
+              info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
+              oldConsumersPerTopicMap.clear()
+              oldPartitionsPerTopicMap.clear()
+          }
+          // commit offsets
+          commitOffsets()
+          // stop all fetchers and clear all the queues to avoid data duplication
+          closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
           // release all partitions, reset state and retry
           releasePartitionOwnership()
           Thread.sleep(config.rebalanceBackoffMs)
@@ -454,26 +453,30 @@ private[kafka] class ZookeeperConsumerCo
       throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
     }
 
-    private def rebalance(): Boolean = {
-      val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic
-      val cluster = ZkUtils.getCluster(zkClient)
-      val consumersPerTopicMap = getConsumersPerTopic(group)
-      val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+    private def rebalance(cluster: Cluster): Boolean = {
+      val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+      val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
+      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
       val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
       if (relevantTopicThreadIdsMap.size <= 0) {
-        info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
+        info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
+          format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
+        debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
+        debug("Consumers per topic cache " + oldConsumersPerTopicMap)
         return true
       }
 
-      // fetchers must be stopped to avoid data duplication, since if the current
-      // rebalancing attempt fails, the partitions that are released could be owned by another consumer.
-      // But if we don't stop the fetchers first, this consumer would continue returning data for released
-      // partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+      /**
+       * fetchers must be stopped to avoid data duplication, since if the current
+       * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
+       * But if we don't stop the fetchers first, this consumer would continue returning data for released
+       * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+       */
       closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
 
-      info("Releasing partition ownership")
       releasePartitionOwnership()
 
+      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
       for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
         topicRegistry.remove(topic)
         topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
@@ -505,41 +508,60 @@ private[kafka] class ZookeeperConsumerCo
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
               val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
-              if (ownPartition)
-                info(consumerThreadId + " successfully owned partition " + partition)
-              else
+              if (!ownPartition)
                 return false
+              else // record the partition ownership decision
+                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
           }
         }
       }
-      updateFetcher(cluster, kafkaMessageStreams)
-      oldPartitionsPerTopicMap = partitionsPerTopicMap
-      oldConsumersPerTopicMap = consumersPerTopicMap
-      true
+
+      /**
+       * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+       * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
+       */
+      if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+        info("Updating the cache")
+        debug("Partitions per topic cache " + partitionsPerTopicMap)
+        debug("Consumers per topic cache " + consumersPerTopicMap)
+        oldPartitionsPerTopicMap = partitionsPerTopicMap
+        oldConsumersPerTopicMap = consumersPerTopicMap
+        updateFetcher(cluster, kafkaMessageStreams)
+        true
+      }else
+        false
     }
 
-    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
-                              relevantTopicThreadIdsMap: Map[String, Set[String]]) {
-      // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
-      // after this rebalancing attempt
-      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+    private def closeFetchersForQueues(cluster: Cluster,
+                                       kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                                       queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) => f.stopConnectionsToAllBrokers
-        f.clearFetcherQueues(allPartitionInfos, cluster, queuesTobeCleared, kafkaMessageStreams)
+        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
         info("Committing all offsets after clearing the fetcher queues")
-        // here, we need to commit offsets before stopping the consumer from returning any more messages
-        // from the current data chunk. Since partition ownership is not yet released, this commit offsets
-        // call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
-        // for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
-        // by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
-        // successfully and the fetchers restart to fetch more data chunks
+        /**
+        * here, we need to commit offsets before stopping the consumer from returning any more messages
+        * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+        * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+        * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+        * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+        * successfully and the fetchers restart to fetch more data chunks
+        **/
         commitOffsets
         case None =>
       }
     }
 
+    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                              relevantTopicThreadIdsMap: Map[String, Set[String]]) {
+      // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
+      // after this rebalancing attempt
+      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+      closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+    }
+
     private def updateFetcher[T](cluster: Cluster,
                                  kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
       // update partitions for fetcher
@@ -560,18 +582,47 @@ private[kafka] class ZookeeperConsumerCo
     private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
                                  topic: String, consumerThreadId: String) : Boolean = {
       val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-      try {
-        ZkUtils.createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+      // check if some other consumer owns this partition at this time
+      val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
+      if(currentPartitionOwner != null) {
+        if(currentPartitionOwner.equals(consumerThreadId)) {
+          info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
+          addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+          true
+        }
+        else {
+          info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
+          false
+        }
+      } else {
+        addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+        true
       }
-      catch {
-        case e: ZkNodeExistsException =>
-        // The node hasn't been deleted by the original owner. So wait a bit and retry.
-          info("waiting for the partition ownership to be deleted: " + partition)
-          return false
-        case e2 => throw e2
+    }
+
+    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+      val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
+        val topic = partitionOwner._1._1
+        val partition = partitionOwner._1._2
+        val consumerThreadId = partitionOwner._2
+        val topicDirs = new ZKGroupTopicDirs(group, topic)
+        val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+        try {
+          createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+          info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+          true
+        }
+        catch {
+          case e: ZkNodeExistsException =>
+            // The node hasn't been deleted by the original owner. So wait a bit and retry.
+            info("waiting for the partition ownership to be deleted: " + partition)
+            false
+          case e2 => throw e2
+        }
       }
-      addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
-      true
+      val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
+      if(success > 0) false       /* even if one of the partition ownership attempt has failed, return false */
+      else true
     }
 
     private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
@@ -580,7 +631,7 @@ private[kafka] class ZookeeperConsumerCo
       val partTopicInfoMap = topicRegistry.get(topic)
 
       val znode = topicDirs.consumerOffsetDir + "/" + partition.name
-      val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+      val offsetString = readDataMaybeNull(zkClient, znode)
       // If first time starting a consumer, set the initial offset based on the config
       var offset : Long = 0L
       if (offsetString == null)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Feb  2 23:40:39 2012
@@ -212,10 +212,18 @@ private[log] class Log(val dir: File, va
     
     // they are valid, insert them in the log
     lock synchronized {
-      val segment = segments.view.last
-      segment.messageSet.append(messages)
-      maybeFlush(numberOfMessages)
-      maybeRoll(segment)
+      try {
+        val segment = segments.view.last
+        segment.messageSet.append(messages)
+        maybeFlush(numberOfMessages)
+        maybeRoll(segment)
+      }
+      catch {
+        case e: IOException =>
+          fatal("Halting due to unrecoverable I/O error while handling producer request", e)
+          Runtime.getRuntime.halt(1)
+        case e2 => throw e2
+      }
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Feb  2 23:40:39 2012
@@ -18,66 +18,77 @@
 package kafka.producer
 
 import async.MissingConfigException
-import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
 import org.apache.log4j.AppenderSkeleton
-import kafka.utils.{Utils, Logging}
+import org.apache.log4j.helpers.LogLog
+import kafka.utils.Logging
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.Message
+import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var port:Int = 0
   var host:String = null
   var topic:String = null
-  var encoderClass:String = null
+  var serializerClass:String = null
+  var zkConnect:String = null
+  var brokerList:String = null
   
-  private var producer:SyncProducer = null
-  private var encoder: Encoder[AnyRef] = null
-  
-  def getPort:Int = port
-  def setPort(port: Int) = { this.port = port }
-
-  def getHost:String = host
-  def setHost(host: String) = { this.host = host }
+  private var producer: Producer[String, String] = null
 
   def getTopic:String = topic
-  def setTopic(topic: String) = { this.topic = topic }
+  def setTopic(topic: String) { this.topic = topic }
 
-  def getEncoder:String = encoderClass
-  def setEncoder(encoder: String) = { this.encoderClass = encoder }
+  def getZkConnect:String = zkConnect
+  def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
   
-  override def activateOptions = {
+  def getBrokerList:String = brokerList
+  def setBrokerList(brokerList: String) { this.brokerList = brokerList }
+  
+  def getSerializerClass:String = serializerClass
+  def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
+
+  override def activateOptions() {
+    val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
     // check for config parameter validity
-    if(host == null)
-      throw new MissingConfigException("Broker Host must be specified by the Kafka log4j appender")
-    if(port == 0)
-      throw new MissingConfigException("Broker Port must be specified by the Kafka log4j appender") 
+    val props = new Properties()
+    if( zkConnect == null) connectDiagnostic += "zkConnect"
+    else props.put("zk.connect", zkConnect);
+    if( brokerList == null) connectDiagnostic += "brokerList"
+    else if( props.isEmpty) props.put("broker.list", brokerList)
+    if(props.isEmpty )
+      throw new MissingConfigException(
+        connectDiagnostic mkString ("One of these connection properties must be specified: ", ", ", ".")
+      )
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
-    if(encoderClass == null) {
-      info("Using default encoder - kafka.producer.DefaultStringEncoder")
-      encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
-    }else // instantiate the encoder, if present
-      encoder = Utils.getObject(encoderClass)
-    val props = new Properties()
-    props.put("host", host)
-    props.put("port", port.toString)
-    producer = new SyncProducer(new SyncProducerConfig(props))
-    info("Kafka producer connected to " + host + "," + port)
-    info("Logging for topic: " + topic)
+    if(serializerClass == null) {
+      serializerClass = "kafka.serializer.StringEncoder"
+      LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
+    }
+    props.put("serializer.class", serializerClass)
+    val config : ProducerConfig = new ProducerConfig(props)
+    producer = new Producer[String, String](config)
+    LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
+    LogLog.debug("Logging for topic: " + topic)
   }
   
-  override def append(event: LoggingEvent) = {
-    debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage +
-            " for " + host + "," + port)
-    val message = encoder.toMessage(event)
-    producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message))
+  override def append(event: LoggingEvent)  {
+    val message : String = if( this.layout == null) {
+      event.getRenderedMessage
+    }
+    else this.layout.format(event)
+    LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
+    val messageData : ProducerData[String, String] =
+      new ProducerData[String, String](topic, message)
+    producer.send(messageData);
   }
 
-  override def close = {
+  override def close() {
     if(!this.closed) {
       this.closed = true
-      producer.close
+      producer.close()
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Thu Feb  2 23:40:39 2012
@@ -32,6 +32,10 @@ class ProducerConfig(val props: Properti
   if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
     throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
 
+  /** If both broker.list and zk.connect options are specified, throw an exception */
+  if(brokerList != null && zkConnect != null)
+    throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
+
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala Thu Feb  2 23:40:39 2012
@@ -72,6 +72,9 @@ abstract class IteratorTemplate[T] exten
   
   def remove = 
     throw new UnsupportedOperationException("Removal not supported")
-  
+
+  protected def resetState() {
+    state = NOT_READY
+  }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Thu Feb  2 23:40:39 2012
@@ -21,6 +21,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{SimpleConsumer, ConsumerConfig}
 import kafka.cluster.Partition
 import kafka.api.OffsetRequest
+import java.lang.IllegalStateException
 
 /**
  *  A utility that updates the offset of every broker partition to the offset of latest log segment file, in ZK.
@@ -55,7 +56,11 @@ object UpdateOffsetsInZK {
     var numParts = 0
     for (partString <- partitions) {
       val part = Partition.parse(partString)
-      val broker = cluster.getBroker(part.brokerId)
+      val broker = cluster.getBroker(part.brokerId) match {
+        case Some(b) => b
+        case None => throw new IllegalStateException("Broker " + part.brokerId + " is unavailable. Cannot issue " +
+          "getOffsetsBefore request")
+      }
       val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100 * 1024)
       val offsets = consumer.getOffsetsBefore(topic, part.partId, offsetOption, 1)
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Feb  2 23:40:39 2012
@@ -23,6 +23,7 @@ import kafka.cluster.{Broker, Cluster}
 import scala.collection._
 import java.util.Properties
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import kafka.consumer.TopicCount
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -299,6 +300,44 @@ object ZkUtils extends Logging {
     zkClient.delete(brokerPartTopicPath)
   }
 
+  def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
+    val dirs = new ZKGroupDirs(group)
+    getChildren(zkClient, dirs.consumerRegistryDir)
+  }
+
+  def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = {
+    val dirs = new ZKGroupDirs(group)
+    val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+    TopicCount.constructTopicCount(consumerId, topicCountJson)
+  }
+
+  def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
+    val dirs = new ZKGroupDirs(group)
+    val consumersInGroup = getConsumersInGroup(zkClient, group)
+    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)))
+    consumersInGroup.zip(topicCountMaps).toMap
+  }
+
+  def getConsumersPerTopic(zkClient: ZkClient, group: String) : mutable.Map[String, List[String]] = {
+    val dirs = new ZKGroupDirs(group)
+    val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
+    val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+    for (consumer <- consumers) {
+      val topicCount = getTopicCount(zkClient, group, consumer)
+      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+        for (consumerThreadId <- consumerThreadIdSet)
+          consumersPerTopicMap.get(topic) match {
+            case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
+            case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
+          }
+      }
+    }
+    for ( (topic, consumerList) <- consumersPerTopicMap )
+      consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
+    consumersPerTopicMap
+  }
+
   /**
    * For a given topic, this returns the sorted list of partition ids registered for this topic
    */

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Thu Feb  2 23:40:39 2012
@@ -21,4 +21,5 @@ log4j.appender.stdout.layout.ConversionP
 log4j.logger.kafka=OFF
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Thu Feb  2 23:40:39 2012
@@ -61,15 +61,20 @@ class ZookeeperConsumerConnectorTest ext
     }
     val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-        println("This is ok")
-      case e => throw e
+
+    // no messages to consume, we should hit timeout;
+    // also the iterator should support re-entrant, so loop it twice
+    for (i <- 0 until  2) {
+      try {
+        getMessages(nMessages*2, topicMessageStreams0)
+        fail("should get an exception")
+      }
+      catch {
+        case e: ConsumerTimeoutException => // this is ok
+        case e => throw e
+      }
     }
+
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Feb  2 23:40:39 2012
@@ -23,6 +23,8 @@ import java.util.Properties
 import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.TestZKUtils
+import kafka.zk.EmbeddedZookeeper
 import junit.framework.Assert._
 import kafka.api.FetchRequest
 import kafka.serializer.Encoder
@@ -35,30 +37,58 @@ import kafka.utils.{TestUtils, Utils, Lo
 
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
-  var logDir: File = null
-  var server: KafkaServer = null
-  val brokerPort: Int = 9092
-  var simpleConsumer: SimpleConsumer = null
+  var logDirZk: File = null
+  var logDirBl: File = null
+  //  var topicLogDir: File = null
+  var serverBl: KafkaServer = null
+  var serverZk: KafkaServer = null
+
+  var simpleConsumerZk: SimpleConsumer = null
+  var simpleConsumerBl: SimpleConsumer = null
+
   val tLogger = Logger.getLogger(getClass())
 
+  private val brokerZk = 0
+  private val brokerBl = 1
+
+  private val ports = TestUtils.choosePorts(2)
+  private val (portZk, portBl) = (ports(0), ports(1))
+
+  private var zkServer:EmbeddedZookeeper = null
+
   @Before
   override def setUp() {
     super.setUp()
-    val config: Properties = createBrokerConfig(1, brokerPort)
-    val logDirPath = config.getProperty("log.dir")
-    logDir = new File(logDirPath)
 
-    server = TestUtils.createServer(new KafkaConfig(config))
+    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
+    val logDirZkPath = propsZk.getProperty("log.dir")
+    logDirZk = new File(logDirZkPath)
+    serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
+
+    val propsBl: Properties = createBrokerConfig(brokerBl, portBl)
+    val logDirBlPath = propsBl.getProperty("log.dir")
+    logDirBl = new File(logDirBlPath)
+    serverBl = TestUtils.createServer(new KafkaConfig(propsBl))
+
     Thread.sleep(100)
-    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+
+    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
+    simpleConsumerBl = new SimpleConsumer("localhost", portBl, 1000000, 64*1024)
   }
 
   @After
   override def tearDown() {
-    simpleConsumer.close
-    server.shutdown
-    Thread.sleep(100)
-    Utils.rm(logDir)
+    simpleConsumerZk.close
+    simpleConsumerBl.close
+
+    serverZk.shutdown
+    serverBl.shutdown
+    Utils.rm(logDirZk)
+    Utils.rm(logDirBl)
+
+    Thread.sleep(500)
+//    zkServer.shutdown
+//    Thread.sleep(500)
     super.tearDown()
   }
 
@@ -67,9 +97,10 @@ class KafkaLog4jAppenderTest extends JUn
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // port missing
@@ -83,9 +114,10 @@ class KafkaLog4jAppenderTest extends JUn
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
-    props.put("log4j.appender.KAFKA.Port", "9092")
+    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // host missing
@@ -99,9 +131,10 @@ class KafkaLog4jAppenderTest extends JUn
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
-    props.put("log4j.appender.KAFKA.Port", "9092")
-    props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // topic missing
@@ -115,30 +148,31 @@ class KafkaLog4jAppenderTest extends JUn
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.Port", "9092")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // serializer missing
     try {
       PropertyConfigurator.configure(props)
     }catch {
-      case e: MissingConfigException => fail("should default to kafka.producer.DefaultStringEncoder")
+      case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
     }
   }
 
   @Test
-  def testLog4jAppends() {
-    PropertyConfigurator.configure(getLog4jConfig)
+  def testBrokerListLog4jAppends() {
+    PropertyConfigurator.configure(getLog4jConfigWithBrokerList)
 
     for(i <- 1 to 5)
       info("test")
 
-    Thread.sleep(500)
+    Thread.sleep(2500)
 
     var offset = 0L
-    val messages = simpleConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
+    val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
 
     var count = 0
     for(message <- messages) {
@@ -149,15 +183,52 @@ class KafkaLog4jAppenderTest extends JUn
     assertEquals(5, count)
   }
 
+  @Test
+  def testZkConnectLog4jAppends() {
+    PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
+
+    for(i <- 1 to 5)
+      info("test")
+
+    Thread.sleep(500)
 
-  private def getLog4jConfig: Properties = {
+    val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
+
+    var count = 0
+    for(message <- messages) {
+      count = count + 1
+    }
+
+    val messagesFromOtherBroker = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
+
+    for(message <- messagesFromOtherBroker) {
+      count = count + 1
+    }
+
+    assertEquals(5, count)
+  }
+
+  private def getLog4jConfigWithBrokerList: Properties = {
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Port", "9092")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
+    props
+  }
+
+  private def getLog4jConfigWithZkConnect: Properties = {
+    var props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+    props.put("log4j.appender.KAFKA.Topic", "test-topic")
+    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
     props
   }
 

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh Thu Feb  2 23:40:39 2012
@@ -68,7 +68,7 @@ readonly test_start_time="$(date +%s)"
 
 readonly num_msg_per_batch=500
 readonly batches_per_iteration=5
-readonly num_iterations=10
+readonly num_iterations=12
 
 readonly zk_source_port=2181
 readonly zk_mirror_port=2182
@@ -132,6 +132,8 @@ producer_performance_crc_log=$base_dir/p
 producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log
 producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log
 
+consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log
+
 consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
 checksum_diff_log=$base_dir/checksum_diff.log
 
@@ -173,6 +175,17 @@ get_random_range() {
     return $(($(($RANDOM % range)) + $lo))
 }
 
+verify_consumer_rebalancing() {
+
+   info "Verifying consumer rebalancing operation"
+
+    $base_dir/bin/kafka-run-class.sh \
+        kafka.tools.VerifyConsumerRebalance \
+        --zk.connect=localhost:2181 \
+        --group $consumer_grp \
+     2>&1 >> $consumer_rebalancing_log
+}
+
 wait_for_zero_consumer_lags() {
 
     # no of times to check for zero lagging
@@ -618,6 +631,7 @@ start_test() {
                     sleep $wait_time_after_restarting_broker
                 fi
             fi
+            verify_consumer_rebalancing
         else
             info "No bouncing performed"
         fi
@@ -662,6 +676,8 @@ start_console_consumer_for_mirror_produc
 wait_for_zero_source_console_consumer_lags
 wait_for_zero_mirror_console_consumer_lags
 
+verify_consumer_rebalancing
+
 shutdown_servers
 
 cmp_checksum

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties?rev=1239937&r1=1239936&r2=1239937&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties Thu Feb  2 23:40:39 2012
@@ -26,13 +26,15 @@ log4j.appender.stdout.layout.ConversionP
 
 # Turn on all our debugging info
 #log4j.logger.kafka=INFO
-#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
+log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.kafka.consumer=DEBUG
 log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
 log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
 #log4j.logger.kafka.producer.async.AsyncProducer=TRACE
 #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE
 log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
 
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG 



Mime
View raw message