kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1228504 - in /incubator/kafka/trunk: core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/consumer/storage/ core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/server/ core/src/test/scala/unit/kafka/integration/ sy...
Date Sat, 07 Jan 2012 00:01:25 GMT
Author: nehanarkhede
Date: Sat Jan  7 00:01:25 2012
New Revision: 1228504

URL: http://svn.apache.org/viewvc?rev=1228504&view=rev
Log:
KAFKA-228 Reduce duplicate messages served by the kafka consumer for uncompressed topics;
patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Sat Jan
 7 00:01:25 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,24 +19,23 @@ package kafka.consumer
 
 import kafka.utils.{IteratorTemplate, Logging}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.cluster.Partition
-import kafka.message.{MessageAndOffset, MessageSet, Message}
+import kafka.message.MessageAndOffset
 import kafka.serializer.Decoder
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * An iterator that blocks until a value can be read from the supplied queue.
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger
a shutdown
- * 
+ *
  */
 class ConsumerIterator[T](private val topic: String,
                           private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
                           private val decoder: Decoder[T])
-        extends IteratorTemplate[T] with Logging {
-  
-  private var current: Iterator[MessageAndOffset] = null
-  private var currentDataChunk: FetchedDataChunk = null
-  private var currentTopicInfo: PartitionTopicInfo = null
+  extends IteratorTemplate[T] with Logging {
+
+  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
+  private var currentTopicInfo:PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
   override def next(): T = {
@@ -50,8 +49,10 @@ class ConsumerIterator[T](private val to
   }
 
   protected def makeNext(): T = {
+    var currentDataChunk: FetchedDataChunk = null
     // if we don't have an iterator, get one
-    if(current == null || !current.hasNext) {
+    var localCurrent = current.get()
+    if(localCurrent == null || !localCurrent.hasNext) {
       if (consumerTimeoutMs < 0)
         currentDataChunk = channel.take
       else {
@@ -71,14 +72,21 @@ class ConsumerIterator[T](private val to
                         .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset,
currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
         }
-        current = currentDataChunk.messages.iterator
+        localCurrent = currentDataChunk.messages.iterator
+        current.set(localCurrent)
       }
     }
-    val item = current.next()
+    val item = localCurrent.next()
     consumedOffset = item.offset
     decoder.toEvent(item.message)
   }
-  
+
+  def clearCurrentChunk() = {
+    try {
+      info("Clearing the current data chunk for this consumer iterator")
+      current.set(null)
+    }
+  }
 }
 
 class ConsumerTimeoutException() extends RuntimeException()

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Sat Jan  7 00:01:25
2012
@@ -21,37 +21,47 @@ import scala.collection._
 import kafka.cluster._
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.BlockingQueue
+import kafka.utils._
 
 /**
  * The fetcher is a background thread that fetches data from a set of servers
  */
-private[consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) {
+private [consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) extends
Logging {
   private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0)
   @volatile
   private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS
 
   /**
-   *  shutdown all fetch threads
+   *  shutdown all fetcher threads
    */
-  def shutdown() {
+  def stopConnectionsToAllBrokers = {
     // shutdown the old fetcher threads, if any
     for (fetcherThread <- fetcherThreads)
       fetcherThread.shutdown
     fetcherThreads = EMPTY_FETCHER_THREADS
   }
 
-  /**
-   *  Open connections.
-   */
-  def initConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
-                      queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
-    shutdown
+  def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+                            queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
+                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
{
+
+    // Clear all but the currently iterated upon chunk in the consumer thread's queue
+    queuesTobeCleared.foreach(_.clear)
+    info("Cleared all relevant queues for this fetcher")
+
+    // Also clear the currently iterated upon chunk in the consumer threads
+    if(kafkaMessageStreams != null)
+       kafkaMessageStreams.foreach(_._2.foreach(s => s.clear()))
 
+    info("Cleared the data chunks in all the consumer message iterators")
+
+  }
+
+  def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
{
     if (topicInfos == null)
       return
 
-    queuesTobeCleared.foreach(_.clear)
-
     // re-arrange by broker id
     val m = new mutable.HashMap[Int, List[PartitionTopicInfo]]
     for(info <- topicInfos) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Sat Jan
 7 00:01:25 2012
@@ -18,10 +18,9 @@
 package kafka.consumer
 
 import java.util.concurrent.CountDownLatch
-import java.nio.channels.{ClosedChannelException, ClosedByInterruptException}
-import kafka.common.{OffsetOutOfRangeException, ErrorMapping}
+import kafka.common.ErrorMapping
 import kafka.cluster.{Partition, Broker}
-import kafka.api.{MultiFetchResponse, OffsetRequest, FetchRequest}
+import kafka.api.{OffsetRequest, FetchRequest}
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import java.io.IOException

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Sat
Jan  7 00:01:25 2012
@@ -18,11 +18,10 @@
 package kafka.consumer
 
 import java.util.concurrent.BlockingQueue
-import kafka.message.Message
-import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.serializer.Decoder
 
 /**
- * All calls to elements should produce the same thread-safe iterator? Should have a seperate
thread
+ * All calls to elements should produce the same thread-safe iterator? Should have a separate
thread
  * that feeds messages into a blocking queue for processing.
  */
 class KafkaMessageStream[T](val topic: String,
@@ -38,4 +37,13 @@ class KafkaMessageStream[T](val topic: S
    *  Create an iterator over messages in the stream.
    */
   def iterator(): ConsumerIterator[T] = iter
+
+  /**
+   * This method clears the queue being iterated during the consumer rebalancing. This is
mainly
+   * to reduce the number of duplicates received by the consumer
+   */
+  def clear() {
+    iter.clearCurrentChunk()
+  }
+
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Sat
Jan  7 00:01:25 2012
@@ -17,7 +17,6 @@
 
 package kafka.consumer
 
-import java.nio.channels._
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.message._

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Sat Jan
 7 00:01:25 2012
@@ -18,11 +18,8 @@
 package kafka.consumer
 
 import java.net._
-import java.nio._
 import java.nio.channels._
-import java.util.concurrent.atomic._
 import kafka.api._
-import kafka.common._
 import kafka.message._
 import kafka.network._
 import kafka.utils._

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Sat Jan  7 00:01:25 2012
@@ -93,6 +93,7 @@ private[kafka] class ZookeeperConsumerCo
   // queues : (topic,consumerThreadId) -> queue
   private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+
   connectZk()
   createFetcher()
   if (config.autoCommit) {
@@ -125,10 +126,10 @@ private[kafka] class ZookeeperConsumerCo
       try {
         scheduler.shutdownNow()
         fetcher match {
-          case Some(f) => f.shutdown()
+          case Some(f) => f.stopConnectionsToAllBrokers
           case None =>
         }
-        sendShudownToAllQueues()
+        sendShutdownToAllQueues()
         if (config.autoCommit)
           commitOffsets()
         if (zkClient != null) {
@@ -167,16 +168,6 @@ private[kafka] class ZookeeperConsumerCo
     val consumerIdString = config.groupId + "_" + consumerUuid
     val topicCount = new TopicCount(consumerIdString, topicCountMap)
 
-    // listener to consumer and partition changes
-    val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString)
-    registerConsumerInZK(dirs, consumerIdString, topicCount)
-
-    // register listener for session expired event
-    zkClient.subscribeStateChanges(
-      new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
-
-    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
-
     // create a queue per topic per consumer thread
     val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
     for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
@@ -188,9 +179,21 @@ private[kafka] class ZookeeperConsumerCo
       }
       ret += (topic -> streamList)
       debug("adding topic " + topic + " and stream to map..")
+    }
+
+    // listener to consumer and partition changes
+    val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString,
ret)
+    registerConsumerInZK(dirs, consumerIdString, topicCount)
+
+    // register listener for session expired event
+    zkClient.subscribeStateChanges(
+      new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener))
+
+    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
 
+    ret.foreach { topicAndStreams =>
       // register on broker partition path changes
-      val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic
+      val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topicAndStreams._1
       zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
     }
 
@@ -205,7 +208,7 @@ private[kafka] class ZookeeperConsumerCo
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
-  private def sendShudownToAllQueues() = {
+  private def sendShutdownToAllQueues() = {
     for (queue <- queues.values) {
       debug("Clearing up queue")
       queue.clear()
@@ -227,8 +230,10 @@ private[kafka] class ZookeeperConsumerCo
   }
 
   def commitOffsets() {
-    if (zkClient == null)
+    if (zkClient == null) {
+      error("zk client is null. Cannot commit offsets")
       return
+    }
     for ((topic, infos) <- topicRegistry) {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       for (info <- infos.values) {
@@ -322,10 +327,10 @@ private[kafka] class ZookeeperConsumerCo
     producedOffset
   }
 
-  class ZKSessionExpireListenner(val dirs: ZKGroupDirs,
+  class ZKSessionExpireListener[T](val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
                                  val topicCount: TopicCount,
-                                 val loadBalancerListener: ZKRebalancerListener)
+                                 val loadBalancerListener: ZKRebalancerListener[T])
     extends IZkStateListener {
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
@@ -358,7 +363,8 @@ private[kafka] class ZookeeperConsumerCo
 
   }
 
-  class ZKRebalancerListener(val group: String, val consumerIdString: String)
+  class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
+                                kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
     extends IZkChildListener {
     private val dirs = new ZKGroupDirs(group)
     private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
@@ -369,7 +375,7 @@ private[kafka] class ZookeeperConsumerCo
       syncedRebalance
     }
 
-    private def releasePartitionOwnership() {
+    private def releasePartitionOwnership()= {
       for ((topic, infos) <- topicRegistry) {
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         for(partition <- infos.keys) {
@@ -441,7 +447,6 @@ private[kafka] class ZookeeperConsumerCo
             return
           // release all partitions, reset state and retry
           releasePartitionOwnership()
-          resetState()
           Thread.sleep(config.rebalanceBackoffMs)
         }
       }
@@ -460,13 +465,15 @@ private[kafka] class ZookeeperConsumerCo
         return true
       }
 
-      info("Committing all offsets")
-      commitOffsets
+      // fetchers must be stopped to avoid data duplication, since if the current
+      // rebalancing attempt fails, the partitions that are released could be owned by another
consumer.
+      // But if we don't stop the fetchers first, this consumer would continue returning
data for released
+      // partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+      closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
 
       info("Releasing partition ownership")
       releasePartitionOwnership()
 
-      val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
       for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
         topicRegistry.remove(topic)
         topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
@@ -503,17 +510,38 @@ private[kafka] class ZookeeperConsumerCo
               else
                 return false
             }
-            queuesToBeCleared += queues.get((topic, consumerThreadId))
           }
         }
       }
-      updateFetcher(cluster, queuesToBeCleared)
+      updateFetcher(cluster, kafkaMessageStreams)
       oldPartitionsPerTopicMap = partitionsPerTopicMap
       oldConsumersPerTopicMap = consumersPerTopicMap
       true
     }
 
-    private def updateFetcher(cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]])
{
+    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                              relevantTopicThreadIdsMap: Map[String, Set[String]]) {
+      // only clear the fetcher queues for certain topic partitions that *might* no longer
be served by this consumer
+      // after this rebalancing attempt
+      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q
=> q._2)
+      var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
+      fetcher match {
+        case Some(f) => f.stopConnectionsToAllBrokers
+        f.clearFetcherQueues(allPartitionInfos, cluster, queuesTobeCleared, kafkaMessageStreams)
+        info("Committing all offsets after clearing the fetcher queues")
+        // here, we need to commit offsets before stopping the consumer from returning any
more messages
+        // from the current data chunk. Since partition ownership is not yet released, this
commit offsets
+        // call will ensure that the offsets committed now will be used by the next consumer
thread owning the partition
+        // for the current data chunk. Since the fetchers are already shutdown and this is
the last chunk to be iterated
+        // by the consumer, there will be no more messages returned by this iterator until
the rebalancing finishes
+        // successfully and the fetchers restart to fetch more data chunks
+        commitOffsets
+        case None =>
+      }
+    }
+
+    private def updateFetcher[T](cluster: Cluster,
+                                 kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
{
       // update partitions for fetcher
       var allPartitionInfos : List[PartitionTopicInfo] = Nil
       for (partitionInfos <- topicRegistry.values)
@@ -523,7 +551,8 @@ private[kafka] class ZookeeperConsumerCo
         allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
 
       fetcher match {
-        case Some(f) => f.initConnections(allPartitionInfos, cluster, queuesTobeCleared)
+        case Some(f) =>
+          f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams)
         case None =>
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala Sat
Jan  7 00:01:25 2012
@@ -17,7 +17,6 @@
 
 package kafka.consumer.storage
 
-import kafka.utils.Range
 
 /**
  * A method for storing offsets for the consumer. 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Sat Jan  7 00:01:25 2012
@@ -48,7 +48,8 @@ private[kafka] class DefaultEventHandler
     if(messagesPerTopic.size > 0) {
       val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
       syncProducer.multiSend(requests)
-      trace("kafka producer sent messages for topics " + messagesPerTopic)
+      trace("kafka producer sent messages for topics %s to broker %s:%d"
+        .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Sat
Jan  7 00:01:25 2012
@@ -106,7 +106,7 @@ private[kafka] class KafkaRequestHandler
   private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
     var  response: MessageSetSend = null
     try {
-      trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition =
" + fetchRequest.partition)
+      trace("Fetching log segment for topic, partition, offset, maxSize = " + fetchRequest)
       val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
       if (log != null)
         response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Sat
Jan  7 00:01:25 2012
@@ -55,11 +55,12 @@ class FetcherTest extends JUnit3Suite wi
   override def setUp() {
     super.setUp
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")),
null)
-    fetcher.initConnections(topicInfos, cluster, Set(queue))
+    fetcher.stopConnectionsToAllBrokers
+    fetcher.startConnections(topicInfos, cluster, null)
   }
 
   override def tearDown() {
-    fetcher.shutdown
+    fetcher.stopConnectionsToAllBrokers
     super.tearDown
   }
     

Modified: incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh?rev=1228504&r1=1228503&r2=1228504&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh Sat Jan  7 00:01:25 2012
@@ -494,6 +494,8 @@ cmp_checksum() {
 
     total_msg_published=`cat $producer_performance_crc_log | wc -l | tr -d ' '`
 
+    duplicate_msg_in_producer=$(( $total_msg_published - $uniq_msg_count_from_producer ))
+
     crc_only_in_mirror_consumer=`comm -23 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
     crc_only_in_source_consumer=`comm -13 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
     crc_common_in_both_consumer=`comm -12 $console_consumer_mirror_crc_sorted_uniq_log $console_consumer_source_crc_sorted_uniq_log`
@@ -502,7 +504,8 @@ cmp_checksum() {
 
     duplicate_mirror_crc=`comm -23 $console_consumer_mirror_crc_sorted_log $console_consumer_mirror_crc_sorted_uniq_log`

     no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer
\
-                          + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer
))
+                          + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer
- \
+                          2*$duplicate_msg_in_producer ))
 
     echo ""
     echo "========================================================"
@@ -649,15 +652,15 @@ cleanup
 sleep 5
 
 # Ctrl-c trap. Catches INT signal
-trap "shutdown_producer; shutdown_servers; exit 0" INT
+trap "shutdown_producer; shutdown_servers, cmp_checksum; exit 0" INT
 
 start_test
 
 start_console_consumer_for_source_producer
 start_console_consumer_for_mirror_producer
 
-wait_for_zero_mirror_console_consumer_lags
 wait_for_zero_source_console_consumer_lags
+wait_for_zero_mirror_console_consumer_lags
 
 shutdown_servers
 



Mime
View raw message