kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5767; Kafka server should halt if IBP < 1.0.0 and there is log directory failure
Date Thu, 05 Oct 2017 00:12:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6bcbd17d3 -> 20d9adb17


KAFKA-5767; Kafka server should halt if IBP < 1.0.0 and there is log directory failure

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #3718 from lindong28/KAFKA-5767


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20d9adb1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20d9adb1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20d9adb1

Branch: refs/heads/trunk
Commit: 20d9adb173a0cde010a2502da7415ecda9bcaa80
Parents: 6bcbd17
Author: Dong Lin <lindong28@gmail.com>
Authored: Wed Oct 4 17:12:53 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 4 17:12:53 2017 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherThread.scala  |   3 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   7 +-
 .../kafka/server/AbstractFetcherManager.scala   |   5 +-
 .../kafka/server/AbstractFetcherThread.scala    |   4 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |   4 +
 core/src/main/scala/kafka/utils/Exit.scala      |   2 +-
 .../scala/kafka/utils/ShutdownableThread.scala  |   6 +-
 .../kafka/api/LogDirFailureTest.scala           | 187 ----------------
 .../integration/UncleanLeaderElectionTest.scala |  14 +-
 .../unit/kafka/server/LogDirFailureTest.scala   | 216 +++++++++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |   4 +-
 13 files changed, 251 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 4c7c227..d5e084e 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -20,7 +20,7 @@ package kafka.consumer
 import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request}
 import kafka.cluster.BrokerEndPoint
 import kafka.message.ByteBufferMessageSet
-import kafka.server.{AbstractFetcherThread, PartitionFetchState}
+import kafka.server.{AbstractFetcherThread, PartitionFetchState, ResultWithPartitions}
 import kafka.common.{ErrorMapping, TopicAndPartition}
 
 import scala.collection.Map
@@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset
 
+
 @deprecated("This class has been deprecated and will be removed in a future release. " +
             "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
 class ConsumerFetcherThread(name: String,

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f4bd8a2..102b1e5 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -131,8 +131,11 @@ class LogManager(logDirs: Array[File],
 
     val liveLogDirs = new ConcurrentLinkedQueue[File]()
 
-    for (dir <- dirs if !initialOfflineDirs.contains(dir)) {
+    for (dir <- dirs) {
       try {
+        if (initialOfflineDirs.contains(dir))
+          throw new IOException(s"Failed to load ${dir.getAbsolutePath} during broker startup")
+
         if (!dir.exists) {
           info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
           val created = dir.mkdirs()
@@ -144,7 +147,7 @@ class LogManager(logDirs: Array[File],
         liveLogDirs.add(dir)
       } catch {
         case e: IOException =>
-          error(s"Failed to create or validate data directory $dir.getAbsolutePath", e)
+          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Failed to create
or validate data directory ${dir.getAbsolutePath}", e)
       }
     }
     if (liveLogDirs.isEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 0d7806c..a8316b4 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -29,8 +29,9 @@ import org.apache.kafka.common.utils.Utils
 
 abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers:
Int = 1)
   extends Logging with KafkaMetricsGroup {
-  // map of (source broker_id, fetcher_id per source broker) => fetcher
-  val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread]
+  // map of (source broker_id, fetcher_id per source broker) => fetcher.
+  // package private for test
+  private[server] val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index e772ac3..5df6732 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -82,8 +82,6 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]):
ResultWithPartitions[REQ]
 
-  case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
-
   protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
 
   override def shutdown(){
@@ -420,3 +418,5 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem,
truncating
 
   override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset,
isReadyForFetch, truncatingLog)
 }
+
+case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f8af7a2..f2d4b16 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -137,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   var zkUtils: ZkUtils = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
-  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new
File(logDir + File.separator +brokerMetaPropsFile)))).toMap
+  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new
File(logDir + File.separator + brokerMetaPropsFile)))).toMap
 
   private var _clusterId: String = null
   private var _brokerTopicStats: BrokerTopicStats = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index b90e9e8..d422112 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -240,7 +240,9 @@ class ReplicaFetcherThread(name: String,
           val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
           requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset,
logStartOffset, fetchSize))
         } catch {
-          case e: KafkaStorageException =>
+          case _: KafkaStorageException =>
+            // The replica has already been marked offline due to log directory failure and
the original failure should have already been logged.
+            // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure()
             partitionsWithError += topicPartition
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 98a4be1..f4f3672 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -318,6 +318,10 @@ class ReplicaManager(val config: KafkaConfig,
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before
it is removed from ISR
     scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs
/ 2, unit = TimeUnit.MILLISECONDS)
     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L,
unit = TimeUnit.MILLISECONDS)
+
+    // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest
V0 which does not include isNew field.
+    // In this case, the broker receiving the request cannot determine whether it is safe
to create a partition if a log directory has failed.
+    // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0
     val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
     logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
     logDirFailureHandler.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/utils/Exit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala
index 3e29ddd..5819e97 100644
--- a/core/src/main/scala/kafka/utils/Exit.scala
+++ b/core/src/main/scala/kafka/utils/Exit.scala
@@ -38,7 +38,7 @@ object Exit {
     JExit.setExitProcedure(functionToProcedure(exitProcedure))
 
   def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
-    JExit.setExitProcedure(functionToProcedure(haltProcedure))
+    JExit.setHaltProcedure(functionToProcedure(haltProcedure))
 
   def resetExitProcedure(): Unit =
     JExit.resetExitProcedure()

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 6ed0968..0922d15 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -27,13 +27,17 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
   this.setDaemon(false)
   this.logIdent = "[" + name + "]: "
   val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  val shutdownLatch = new CountDownLatch(1)
+  private val shutdownLatch = new CountDownLatch(1)
 
   def shutdown(): Unit = {
     initiateShutdown()
     awaitShutdown()
   }
 
+  def isShutdownComplete: Boolean = {
+    shutdownLatch.getCount == 0
+  }
+
   def initiateShutdown(): Boolean = {
     if (isRunning.compareAndSet(true, false)) {
       info("Shutting down")

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
deleted file mode 100644
index b1ac47b..0000000
--- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-import java.util.concurrent.{ExecutionException, TimeUnit}
-
-import kafka.controller.{OfflineReplica, PartitionAndReplica}
-import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
-import org.junit.{Before, Test}
-import org.junit.Assert.assertTrue
-import org.junit.Assert.assertEquals
-
-import scala.collection.JavaConverters._
-
-/**
-  * Test whether clients can producer and consume when there is log directory failure
-  */
-class LogDirFailureTest extends IntegrationTestHarness {
-
-  import kafka.api.LogDirFailureTest._
-
-  val producerCount: Int = 1
-  val consumerCount: Int = 1
-  val serverCount: Int = 2
-  private val topic = "topic"
-  private val partitionNum = 12
-
-  this.logDirCount = 3
-  this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
-  this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
-  this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp,
"60000")
-  this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    TestUtils.createTopic(zkUtils, topic, partitionNum, serverCount, servers = servers)
-  }
-
-  @Test
-  def testIOExceptionDuringLogRoll() {
-    testProduceAfterLogDirFailureOnLeader(Roll)
-  }
-
-  @Test
-  def testIOExceptionDuringCheckpoint() {
-    testProduceAfterLogDirFailureOnLeader(Checkpoint)
-  }
-
-  @Test
-  def testReplicaFetcherThreadAfterLogDirFailureOnFollower() {
-    val producer = producers.head
-    val partition = new TopicPartition(topic, 0)
-
-    val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get
-    val leaderServerId = partitionInfo.leader().id()
-    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
-    val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != leaderServerId).get
-    val followerServer = servers.find(_.config.brokerId == followerServerId).get
-
-    followerServer.replicaManager.markPartitionOffline(partition)
-    // Send a message to another partition whose leader is the same as partition 0
-    // so that ReplicaFetcherThread on the follower will get response from leader immediately
-    val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i =>
-      leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined
-    }.get
-    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader,
topic.getBytes, "message".getBytes)
-    // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on
the follower
-    // has fetched from the leader and attempts to append to the offline replica.
-    producer.send(record).get
-
-    assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic,
anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
-    followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach {
thread =>
-      assertTrue("ReplicaFetcherThread should still be working if its partition count >
0", thread.shutdownLatch.getCount > 0)
-    }
-  }
-
-  def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-    val producer = producers.head
-    val partition = new TopicPartition(topic, 0)
-    val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes)
-
-    val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
-    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
-
-    // The first send() should succeed
-    producer.send(record).get()
-    TestUtils.waitUntilTrue(() => {
-      consumer.poll(0).count() == 1
-    }, "Expected the first message", 3000L)
-
-    // Make log directory of the partition on the leader broker inaccessible by replacing
it with a file
-    val replica = leaderServer.replicaManager.getReplicaOrException(partition)
-    val logDir = replica.log.get.dir.getParentFile
-    CoreUtils.swallow(Utils.delete(logDir))
-    logDir.createNewFile()
-    assertTrue(logDir.isFile)
-
-    if (failureType == Roll) {
-      try {
-        leaderServer.replicaManager.getLog(partition).get.roll()
-        fail("Log rolling should fail with KafkaStorageException")
-      } catch {
-        case e: KafkaStorageException => // This is expected
-      }
-    } else if (failureType == Checkpoint) {
-      leaderServer.replicaManager.checkpointHighWatermarks()
-    }
-
-    // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the
topic will be offline
-    TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath),
"Expected log directory offline", 3000L)
-    assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
-
-    // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
-    try {
-      producer.send(record).get(6000, TimeUnit.MILLISECONDS)
-      fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException")
-    } catch {
-      case e: ExecutionException =>
-        e.getCause match {
-          case t: KafkaStorageException =>
-          case t: NotLeaderForPartitionException => // This may happen if ProduceRequest
version <= 3
-          case t: Throwable => fail(s"send() should fail with either KafkaStorageException
or NotLeaderForPartitionException instead of ${t.toString}")
-        }
-      case e: Throwable => fail(s"send() should fail with either KafkaStorageException
or NotLeaderForPartitionException instead of ${e.toString}")
-    }
-
-    // Wait for producer to update metadata for the partition
-    TestUtils.waitUntilTrue(() => {
-      // ProduceResponse may contain KafkaStorageException and trigger metadata update
-      producer.send(record)
-      producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() !=
leaderServerId
-    }, "Expected new leader for the partition", 6000L)
-
-    // Consumer should receive some messages
-    TestUtils.waitUntilTrue(() => {
-      consumer.poll(0).count() > 0
-    }, "Expected some messages", 3000L)
-
-    // There should be no remaining LogDirEventNotification znode
-    assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty)
-
-    // The controller should have marked the replica on the original leader as offline
-    val controllerServer = servers.find(_.kafkaController.isActive).get
-    val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic,
OfflineReplica)
-    assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId)))
-  }
-
-  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte],
Array[Byte]]) {
-    consumer.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumer.poll(0)
-      !consumer.assignment.isEmpty
-    }, "Expected non-empty assignment")
-  }
-
-}
-
-object LogDirFailureTest {
-  sealed trait LogDirFailureType
-  case object Roll extends LogDirFailureType
-  case object Checkpoint extends LogDirFailureType
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 0af0a04..24421d0 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -182,14 +182,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(servers, topic, null, "first")
+    produceMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
-    produceMessage(servers, topic, null, "second")
+    produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -199,7 +199,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId))
 
-    produceMessage(servers, topic, null, "third")
+    produceMessage(servers, topic, "third")
 
     // second message was lost due to unclean election
     assertEquals(List("first", "third"), consumeAllMessages(topic))
@@ -215,14 +215,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(servers, topic, null, "first")
+    produceMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
-    produceMessage(servers, topic, null, "second")
+    produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -234,7 +234,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
     // message production and consumption should both fail while leader is down
     try {
-      produceMessage(servers, topic, null, "third")
+      produceMessage(servers, topic, "third")
       fail("Message produced while leader is down should fail, but it succeeded")
     } catch {
       case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
@@ -246,7 +246,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
     waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId))
 
-    produceMessage(servers, topic, null, "third")
+    produceMessage(servers, topic, "third")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
new file mode 100644
index 0000000..438d736
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.io.File
+import java.util.Collections
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import kafka.server.LogDirFailureTest._
+import kafka.api.IntegrationTestHarness
+import kafka.controller.{OfflineReplica, PartitionAndReplica}
+import kafka.utils.{CoreUtils, Exit, TestUtils, ZkUtils}
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
+import org.junit.{Before, Test}
+import org.junit.Assert.{assertTrue, assertFalse, assertEquals}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Test whether clients can producer and consume when there is log directory failure
+  */
+class LogDirFailureTest extends IntegrationTestHarness {
+
+  val producerCount: Int = 1
+  val consumerCount: Int = 1
+  val serverCount: Int = 2
+  private val topic = "topic"
+  private val partitionNum = 12
+
+  this.logDirCount = 3
+  this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+  this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
+  this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp,
"60000")
+  this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    TestUtils.createTopic(zkUtils, topic, partitionNum, serverCount, servers = servers)
+  }
+
+  @Test
+  def testIOExceptionDuringLogRoll() {
+    testProduceAfterLogDirFailureOnLeader(Roll)
+  }
+
+  @Test
+  // Broker should halt on any log directory failure if inter-broker protocol < 1.0
+  def brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() {
+    @volatile var statusCodeOption: Option[Int] = None
+    Exit.setHaltProcedure { (statusCode, _) =>
+      statusCodeOption = Some(statusCode)
+      throw new IllegalArgumentException
+    }
+
+    var server: KafkaServer = null
+    try {
+      val props = TestUtils.createBrokerConfig(serverCount, zkConnect, logDirCount = 3)
+      props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0")
+      props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0")
+      val kafkaConfig = KafkaConfig.fromProps(props)
+      val logDir = new File(kafkaConfig.logDirs.head)
+      // Make log directory of the partition on the leader broker inaccessible by replacing
it with a file
+      CoreUtils.swallow(Utils.delete(logDir))
+      logDir.createNewFile()
+      assertTrue(logDir.isFile)
+
+      server = TestUtils.createServer(kafkaConfig)
+      TestUtils.waitUntilTrue(() => statusCodeOption.contains(1), "timed out waiting for
broker to halt")
+    } finally {
+      Exit.resetHaltProcedure()
+      if (server != null)
+        TestUtils.shutdownServers(List(server))
+    }
+  }
+
+  @Test
+  def testIOExceptionDuringCheckpoint() {
+    testProduceAfterLogDirFailureOnLeader(Checkpoint)
+  }
+
+  @Test
+  def testReplicaFetcherThreadAfterLogDirFailureOnFollower() {
+    val producer = producers.head
+    val partition = new TopicPartition(topic, 0)
+
+    val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get
+    val leaderServerId = partitionInfo.leader().id()
+    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+    val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != leaderServerId).get
+    val followerServer = servers.find(_.config.brokerId == followerServerId).get
+
+    followerServer.replicaManager.markPartitionOffline(partition)
+    // Send a message to another partition whose leader is the same as partition 0
+    // so that ReplicaFetcherThread on the follower will get response from leader immediately
+    val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i =>
+      leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined
+    }.get
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader,
topic.getBytes, "message".getBytes)
+    // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on
the follower
+    // has fetched from the leader and attempts to append to the offline replica.
+    producer.send(record).get
+
+    assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic,
anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
+    followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach {
thread =>
+      assertFalse("ReplicaFetcherThread should still be working if its partition count >
0", thread.isShutdownComplete)
+    }
+  }
+
+  def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+    val producer = producers.head
+    val partition = new TopicPartition(topic, 0)
+    val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes)
+
+    val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
+    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+
+    // The first send() should succeed
+    producer.send(record).get()
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0).count() == 1
+    }, "Expected the first message", 3000L)
+
+    // Make log directory of the partition on the leader broker inaccessible by replacing
it with a file
+    val replica = leaderServer.replicaManager.getReplicaOrException(partition)
+    val logDir = replica.log.get.dir.getParentFile
+    CoreUtils.swallow(Utils.delete(logDir))
+    logDir.createNewFile()
+    assertTrue(logDir.isFile)
+
+    if (failureType == Roll) {
+      try {
+        leaderServer.replicaManager.getLog(partition).get.roll()
+        fail("Log rolling should fail with KafkaStorageException")
+      } catch {
+        case e: KafkaStorageException => // This is expected
+      }
+    } else if (failureType == Checkpoint) {
+      leaderServer.replicaManager.checkpointHighWatermarks()
+    }
+
+    // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the
topic will be offline
+    TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath),
"Expected log directory offline", 3000L)
+    assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
+
+    // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
+    try {
+      producer.send(record).get(6000, TimeUnit.MILLISECONDS)
+      fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException")
+    } catch {
+      case e: ExecutionException =>
+        e.getCause match {
+          case t: KafkaStorageException =>
+          case t: NotLeaderForPartitionException => // This may happen if ProduceRequest
version <= 3
+          case t: Throwable => fail(s"send() should fail with either KafkaStorageException
or NotLeaderForPartitionException instead of ${t.toString}")
+        }
+      case e: Throwable => fail(s"send() should fail with either KafkaStorageException
or NotLeaderForPartitionException instead of ${e.toString}")
+    }
+
+    // Wait for producer to update metadata for the partition
+    TestUtils.waitUntilTrue(() => {
+      // ProduceResponse may contain KafkaStorageException and trigger metadata update
+      producer.send(record)
+      producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() !=
leaderServerId
+    }, "Expected new leader for the partition", 6000L)
+
+    // Consumer should receive some messages
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0).count() > 0
+    }, "Expected some messages", 3000L)
+
+    // There should be no remaining LogDirEventNotification znode
+    assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty)
+
+    // The controller should have marked the replica on the original leader as offline
+    val controllerServer = servers.find(_.kafkaController.isActive).get
+    val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic,
OfflineReplica)
+    assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId)))
+  }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte],
Array[Byte]]) {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+}
+
+object LogDirFailureTest {
+  sealed trait LogDirFailureType
+  case object Roll extends LogDirFailureType
+  case object Checkpoint extends LogDirFailureType
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/20d9adb1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 687307a..1149db4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1109,13 +1109,13 @@ object TestUtils extends Logging {
     values
   }
 
-  def produceMessage(servers: Seq[KafkaServer], topic: String, partition: Integer, message:
String) {
+  def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
       retries = 5,
       requestTimeoutMs = 2000
     )
-    producer.send(new ProducerRecord(topic, partition, topic.getBytes, message.getBytes)).get
+    producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
     producer.close()
   }
 


Mime
View raw message