kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [8/37] git commit: Follow up to KAFKA-695:Broker shuts down due to attempt to read a closed index file, reviewed by Neha and Jay
Date Mon, 04 Mar 2013 04:22:03 GMT
Follow up to KAFKA-695:Broker shuts down due to attempt to read a closed index file, reviewed
by Neha and Jay


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/814c9709
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/814c9709
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/814c9709

Branch: refs/heads/trunk
Commit: 814c9709c07c514b1a1255e2adb5555c323e6485
Parents: 826f02a
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue Feb 5 22:05:28 2013 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Feb 5 22:05:28 2013 -0800

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherThread.scala     |    3 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   16 ++++--
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   43 +++++++++------
 .../main/scala/kafka/server/RequestPurgatory.scala |    3 +-
 .../scala/kafka/utils/ShutdownableThread.scala     |    5 +-
 5 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/814c9709/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 1135f5d..1dfc75c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -38,7 +38,8 @@ class ConsumerFetcherThread(name: String,
                                       fetchSize = config.fetchMessageMaxBytes,
                                       fetcherBrokerId = Request.OrdinaryConsumerId,
                                       maxWait = config.fetchWaitMaxMs,
-                                      minBytes = config.fetchMinBytes) {
+                                      minBytes = config.fetchMinBytes,
+                                      isInterruptible = true) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/814c9709/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1ccf578..a7d39b1 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -22,7 +22,6 @@ import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.message.MessageAndOffset
-import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -30,14 +29,16 @@ import kafka.utils.{Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
+import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
 abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker,
socketTimeout: Int, socketBufferSize: Int,
-                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1)
-  extends ShutdownableThread(name) {
+                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1,
+                                     isInterruptible: Boolean = true)
+  extends ShutdownableThread(name, isInterruptible) {
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition)
-> offset map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
@@ -72,8 +73,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   override def doWork() {
     partitionMapLock.lock()
     try {
-      while (partitionMap.isEmpty)
-        partitionMapCond.await()
+      if (partitionMap.isEmpty)
+        partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
       partitionMap.foreach {
         case((topicAndPartition, offset)) =>
           fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
@@ -84,6 +85,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     }
 
     val fetchRequest = fetchRequestBuilder.build()
+    if (!fetchRequest.requestInfo.isEmpty)
+      processFetchRequest(fetchRequest)
+  }
+
+  private def processFetchRequest(fetchRequest: FetchRequest) {
     val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     var response: FetchResponse = null
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/814c9709/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c03f758..37b71be 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
-import kafka.common.{TopicAndPartition, ErrorMapping}
-import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
+import kafka.common.{KafkaStorageException, TopicAndPartition, ErrorMapping}
 
 class ReplicaFetcherThread(name:String,
                            sourceBroker: Broker,
@@ -34,26 +34,33 @@ class ReplicaFetcherThread(name:String,
                                 fetchSize = brokerConfig.replicaFetchMaxBytes,
                                 fetcherBrokerId = brokerConfig.brokerId,
                                 maxWait = brokerConfig.replicaFetchWaitMaxMs,
-                                minBytes = brokerConfig.replicaFetchMinBytes) {
+                                minBytes = brokerConfig.replicaFetchMinBytes,
+                                isInterruptible = false) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {
-    val topic = topicAndPartition.topic
-    val partitionId = topicAndPartition.partition
-    val replica = replicaMgr.getReplica(topic, partitionId).get
-    val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+    try {
+      val topic = topicAndPartition.topic
+      val partitionId = topicAndPartition.partition
+      val replica = replicaMgr.getReplica(topic, partitionId).get
+      val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
 
-    if (fetchOffset != replica.logEndOffset)
-      throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset =
%d.".format(fetchOffset, replica.logEndOffset))
-    trace("Follower %d has replica log end offset %d. Received %d messages and leader hw
%d"
-          .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
-    replica.log.get.append(messageSet, assignOffsets = false)
-    trace("Follower %d has replica log end offset %d after appending %d bytes of messages"
-          .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
-    val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
-    replica.highWatermark = followerHighWatermark
-    trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
-          .format(replica.brokerId, topic, partitionId, followerHighWatermark))
+      if (fetchOffset != replica.logEndOffset)
+        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset
= %d.".format(fetchOffset, replica.logEndOffset))
+      trace("Follower %d has replica log end offset %d. Received %d messages and leader hw
%d"
+            .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
+      replica.log.get.append(messageSet, assignOffsets = false)
+      trace("Follower %d has replica log end offset %d after appending %d bytes of messages"
+            .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
+      val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
+      replica.highWatermark = followerHighWatermark
+      trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+            .format(replica.brokerId, topic, partitionId, followerHighWatermark))
+    } catch {
+      case e: KafkaStorageException =>
+        fatal("Disk error while replicating data.", e)
+        Runtime.getRuntime.halt(1)
+    }
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/814c9709/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 3aaf38e..afe9e22 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -85,7 +85,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int
= 0, purge
 
   /* background thread expiring requests that have been waiting too long */
   private val expiredRequestReaper = new ExpiredRequestReaper
-  private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
+  private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper,
daemon=false)
   expirationThread.start()
 
   /**
@@ -241,7 +241,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int
= 0, purge
     def shutdown() {
       debug("Shutting down.")
       running.set(false)
-      expirationThread.interrupt()
       shutdownLatch.await()
       debug("Shut down complete.")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/814c9709/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 4cca338..cf8adc9 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -20,7 +20,7 @@ package kafka.utils
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.CountDownLatch
 
-abstract class ShutdownableThread(val name: String)
+abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
         extends Thread(name) with Logging {
   this.setDaemon(false)
   this.logIdent = "[" + name + "], "
@@ -31,7 +31,8 @@ abstract class ShutdownableThread(val name: String)
   def shutdown(): Unit = {
     info("Shutting down")
     isRunning.set(false)
-    interrupt()
+    if (isInterruptible)
+      interrupt()
     shutdownLatch.await()
     info("Shutdown completed")
   }


Mime
View raw message