kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1461; Replica fetcher thread does not implement any back-off behavior; patched by Sriharsha Chintalapani; reviewed by Jun Rao
Date Thu, 12 Mar 2015 20:56:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 01f20e029 -> b7439c808


kafka-1461; Replica fetcher thread does not implement any back-off behavior; patched by Sriharsha
Chintalapani; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7439c80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7439c80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7439c80

Branch: refs/heads/trunk
Commit: b7439c8081465f8764e88326b3fa7b320f99f130
Parents: 01f20e0
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Thu Mar 12 13:56:52 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Mar 12 13:56:52 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsumerFetcherThread.scala     |  3 ++-
 .../main/scala/kafka/server/AbstractFetcherThread.scala  | 11 ++++++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala       |  9 ++++++++-
 .../main/scala/kafka/server/ReplicaFetcherThread.scala   |  1 +
 .../unit/kafka/server/KafkaConfigConfigDefTest.scala     |  1 +
 core/src/test/scala/unit/kafka/utils/TestUtils.scala     |  1 +
 6 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7439c80/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index ee6139c..152fda5 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -29,7 +29,7 @@ class ConsumerFetcherThread(name: String,
                             sourceBroker: Broker,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name, 
+        extends AbstractFetcherThread(name = name,
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
@@ -38,6 +38,7 @@ class ConsumerFetcherThread(name: String,
                                       fetcherBrokerId = Request.OrdinaryConsumerId,
                                       maxWait = config.fetchWaitMaxMs,
                                       minBytes = config.fetchMinBytes,
+                                      fetchBackOffMs = config.refreshLeaderBackoffMs,
                                       isInterruptible = true) {
 
   // process fetched data

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7439c80/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8c281d4..e731df4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -37,7 +37,7 @@ import com.yammer.metrics.core.Gauge
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
 abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker,
socketTimeout: Int, socketBufferSize: Int,
-                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1,
+                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition)
-> offset map
@@ -66,7 +66,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
 
   override def shutdown(){
-    super.shutdown()
+    initiateShutdown()
+    inLock(partitionMapLock) {
+      partitionMapCond.signalAll()
+    }
+    awaitShutdown()
     simpleConsumer.close()
   }
 
@@ -98,6 +102,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
           partitionMapLock synchronized {
             partitionsWithError ++= partitionMap.keys
+            // there is an error occurred while fetching partitions, sleep a while
+            partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
           }
         }
     }
@@ -241,4 +247,3 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup
{
 case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
   override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7439c80/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 48e3362..46d21c7 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -94,6 +94,7 @@ object Defaults {
   val ReplicaFetchWaitMaxMs = 500
   val ReplicaFetchMinBytes = 1
   val NumReplicaFetchers = 1
+  val ReplicaFetchBackoffMs = 1000
   val ReplicaHighWatermarkCheckpointIntervalMs = 5000L
   val FetchPurgatoryPurgeIntervalRequests = 1000
   val ProducerPurgatoryPurgeIntervalRequests = 1000
@@ -199,6 +200,7 @@ object KafkaConfig {
   val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes"
   val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms"
   val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes"
+  val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms"
   val NumReplicaFetchersProp = "num.replica.fetchers"
   val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
   val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
@@ -311,6 +313,7 @@ object KafkaConfig {
   val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough
bytes, wait up to replicaMaxWaitTimeMs"
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from
a source broker. " +
     "Increasing this value can increase the degree of I/O parallelism in the follower broker."
+  val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error
occurs."
   val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark
is saved out to disk"
   val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests)
of the fetch request purgatory"
   val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests)
of the producer request purgatory"
@@ -429,6 +432,7 @@ object KafkaConfig {
       .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes,
HIGH, ReplicaSocketReceiveBufferBytesDoc)
       .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc)
       .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.ReplicaFetchWaitMaxMs, HIGH, ReplicaFetchWaitMaxMsDoc)
+      .define(ReplicaFetchBackoffMsProp, INT, Defaults.ReplicaFetchBackoffMs, atLeast(0),
MEDIUM, ReplicaFetchBackoffMsDoc)
       .define(ReplicaFetchMinBytesProp, INT, Defaults.ReplicaFetchMinBytes, HIGH, ReplicaFetchMinBytesDoc)
       .define(NumReplicaFetchersProp, INT, Defaults.NumReplicaFetchers, HIGH, NumReplicaFetchersDoc)
       .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs,
HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc)
@@ -548,6 +552,7 @@ object KafkaConfig {
       replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int],
       replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int],
       replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int],
+      replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int],
       numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int],
       replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long],
       fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int],
@@ -688,6 +693,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
                   val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes,
                   val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs,
                   val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes,
+                  val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs,
                   val numReplicaFetchers: Int = Defaults.NumReplicaFetchers,
                   val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs,
                   val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests,
@@ -856,6 +862,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
     props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString)
     props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
     props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
+    props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString)
     props.put(NumReplicaFetchersProp, numReplicaFetchers.toString)
     props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString)
     props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString)
@@ -886,4 +893,4 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
 
     props
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7439c80/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d6d14fb..96faa7b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -37,6 +37,7 @@ class ReplicaFetcherThread(name:String,
                                 fetcherBrokerId = brokerConfig.brokerId,
                                 maxWait = brokerConfig.replicaFetchWaitMaxMs,
                                 minBytes = brokerConfig.replicaFetchMinBytes,
+                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                 isInterruptible = false) {
 
   // process fetched data

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7439c80/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index c124c8d..191251d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -120,6 +120,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
     Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes)
     Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs)
     Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes)
+    Assert.assertEquals(expectedConfig.replicaFetchBackoffMs, actualConfig.replicaFetchBackoffMs)
     Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers)
     Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs)
     Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7439c80/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 52c7920..1682a77 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -164,6 +164,7 @@ object TestUtils extends Logging {
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
     props.put("replica.socket.timeout.ms", "1500")
+    props.put("controller.socket.timeout.ms", "1500")
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props


Mime
View raw message