kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/5] kafka git commit: KAFKA-4763; Handle disk failure for JBOD (KIP-112)
Date Sat, 22 Jul 2017 19:36:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 91b5fc737 -> fc93fb4b6


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 5d221fe..683b34e 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -16,13 +16,13 @@
 */
 package kafka.server
 
+import java.io.File
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
-import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.LeaderEpochCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -54,9 +54,13 @@ class IsrExpirationTest {
 
   @Before
   def setUp() {
-    replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId))
+    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
+    EasyMock.replay(logManager)
+
+    replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false),
+      QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
+      new LogDirFailureChannel(configs.head.logDirs.size))
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 3497cc3..20526a1 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -145,10 +145,10 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       val partitionStates = Map(
         new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
           Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
-          Seq(0, 1).map(Integer.valueOf).asJava)
+          Seq(0, 1).map(Integer.valueOf).asJava, false)
       )
       val requestBuilder = new LeaderAndIsrRequest.Builder(
-          controllerId, staleControllerEpoch, partitionStates.asJava, nodes.toSet.asJava)
+        ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, staleControllerEpoch, partitionStates.asJava, nodes.toSet.asJava)
 
       controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, requestBuilder,
         staleControllerEpochCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9383355..e053968 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -181,7 +181,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -210,7 +210,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d9fe995..0e90121 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -23,7 +23,7 @@ import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint}
 import org.junit.Test
 import org.junit.Assert._
@@ -65,9 +65,9 @@ class MetadataCacheTest {
     }.toSet
 
     val partitionStates = Map(
-      new TopicPartition(topic0, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0, 1, 3), zkVersion, asList(0, 1, 3)),
-      new TopicPartition(topic0, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1, 0), zkVersion, asList(1, 2, 0, 4)),
-      new TopicPartition(topic1, 0) -> new PartitionState(controllerEpoch, 2, 2, asList(2, 1), zkVersion, asList(2, 1, 3)))
+      new TopicPartition(topic0, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, 0, 0, asList(0, 1, 3), zkVersion, asList(0, 1, 3), asList()),
+      new TopicPartition(topic0, 1) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, 1, 1, asList(1, 0), zkVersion, asList(1, 2, 0, 4), asList()),
+      new TopicPartition(topic1, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, 2, 2, asList(2, 1), zkVersion, asList(2, 1, 3), asList()))
 
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch,
@@ -125,7 +125,7 @@ class MetadataCacheTest {
     val leader = 1
     val leaderEpoch = 1
     val partitionStates = Map(
-      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0)))
+      new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList()))
 
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch,
@@ -169,7 +169,7 @@ class MetadataCacheTest {
     val isr = asList[Integer](0)
 
     val partitionStates = Map(
-      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas))
+      new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, asList()))
 
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch,
@@ -229,7 +229,7 @@ class MetadataCacheTest {
     val isr = asList[Integer](0, 1)
 
     val partitionStates = Map(
-      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas))
+      new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, asList()))
 
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch,
@@ -282,7 +282,7 @@ class MetadataCacheTest {
     val replicas = asList[Integer](0)
     val isr = asList[Integer](0, 1)
     val partitionStates = Map(
-      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas))
+      new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas, asList()))
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava,
       brokers.asJava).build()
@@ -315,7 +315,7 @@ class MetadataCacheTest {
       val replicas = asList[Integer](0)
       val isr = asList[Integer](0, 1)
       val partitionStates = Map(
-        new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas))
+        new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas, asList()))
       val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
       val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava,
         brokers.asJava).build()

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index b0e81a9..231b180 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -94,8 +94,8 @@ class ReplicaFetcherThreadTest {
     val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
 
     val expected = Map(
-      t1p0 -> new EpochEndOffset(Errors.UNKNOWN, UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(Errors.UNKNOWN, UNDEFINED_EPOCH_OFFSET)
+      t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH_OFFSET)
     )
 
     assertEquals("results from leader epoch request should have undefined offset", expected, result)
@@ -271,7 +271,7 @@ class ReplicaFetcherThreadTest {
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
     val offsetsReply = mutable.Map(
       t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
-      t1p1 -> new EpochEndOffset(UNKNOWN, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+      t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
     ).asJava
 
     //Create the thread

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 2ee08a2..483e708 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -16,6 +16,7 @@
   */
 package kafka.server
 
+import java.io.File
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -178,11 +179,12 @@ class ReplicaManagerQuotasTest {
 
     //Return the same log for each partition as it doesn't matter
     expect(logManager.getLog(anyObject())).andReturn(Some(log)).anyTimes()
+    expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
-      new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
+      new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5794854..6f2a6a7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -27,7 +27,7 @@ import TestUtils.createBroker
 import kafka.utils.timer.MockTimer
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -48,13 +48,13 @@ class ReplicaManagerTest {
   val metrics = new Metrics
   var zkClient : ZkClient = _
   var zkUtils : ZkUtils = _
-    
+
   @Before
   def setUp() {
     zkClient = EasyMock.createMock(classOf[ZkClient])
     zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
   }
-  
+
   @After
   def tearDown() {
     metrics.close()
@@ -67,7 +67,7 @@ class ReplicaManagerTest {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(config.brokerId))
+      new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -86,7 +86,7 @@ class ReplicaManagerTest {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(config.brokerId))
+      new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -104,7 +104,7 @@ class ReplicaManagerTest {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(config.brokerId), Option(this.getClass.getName))
+      new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
         assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@@ -137,7 +137,7 @@ class ReplicaManagerTest {
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      metadataCache)
+      metadataCache, new LogDirFailureChannel(config.logDirs.size))
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
@@ -145,8 +145,8 @@ class ReplicaManagerTest {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -162,8 +162,8 @@ class ReplicaManagerTest {
       assertFalse(fetchResult.isFired)
 
       // Make this replica the follower
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerList)).asJava,
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
@@ -186,8 +186,8 @@ class ReplicaManagerTest {
       partition.getOrCreateReplica(0)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -271,14 +271,12 @@ class ReplicaManagerTest {
 
     try {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
-      val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
-
       val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -346,7 +344,7 @@ class ReplicaManagerTest {
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      metadataCache, Option(this.getClass.getName))
+      metadataCache, new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
 
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
@@ -355,8 +353,8 @@ class ReplicaManagerTest {
       partition.getOrCreateReplica(0)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -498,7 +496,8 @@ class ReplicaManagerTest {
 
     new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      metadataCache, mockProducePurgatory, mockFetchPurgatory, mockDeleteRecordsPurgatory, Option(this.getClass.getName))
+      metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
+      mockDeleteRecordsPurgatory, Option(this.getClass.getName))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7c171b0..ecb75ce 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -180,15 +180,16 @@ class RequestQuotaTest extends BaseRequestTest {
             .setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
 
         case ApiKeys.LEADER_AND_ISR =>
-          new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
-            Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
+          new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
+            Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, true)).asJava,
             Set(new Node(brokerId, "localhost", 0)).asJava)
 
         case ApiKeys.STOP_REPLICA =>
           new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava)
 
         case ApiKeys.UPDATE_METADATA_KEY =>
-          val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava
+          val partitionState = Map(tp -> new UpdateMetadataRequest.PartitionState(
+            Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, Seq.empty[Integer].asJava)).asJava
           val securityProtocol = SecurityProtocol.PLAINTEXT
           val brokers = Set(new UpdateMetadataRequest.Broker(brokerId,
             Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 72d7fc5..a20c659 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.server
 
+import java.io.File
+
 import kafka.api._
 import kafka.utils._
 import kafka.cluster.Replica
@@ -105,12 +107,13 @@ class SimpleFetchTest {
     // create the log manager that is aware of this mock log
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
     EasyMock.expect(logManager.getLog(topicPartition)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     EasyMock.replay(logManager)
 
     // create the replica manager
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(configs.head.brokerId))
+      new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 
     // add the partition with two replicas, both in ISR
     val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index 4daaa72..f4998f6 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -16,10 +16,10 @@
   */
 package kafka.server.checkpoints
 
-import java.io.IOException
-
+import kafka.server.LogDirFailureChannel
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
@@ -89,12 +89,14 @@ class OffsetCheckpointFileTest extends JUnitSuite with Logging {
     assertEquals(Map(), checkpoint.read())
   }
 
-  @Test(expected = classOf[IOException])
+  @Test(expected = classOf[KafkaStorageException])
   def shouldThrowIfVersionIsNotRecognised(): Unit = {
-    val checkpointFile = new CheckpointFile(TestUtils.tempFile(), OffsetCheckpointFile.CurrentVersion + 1,
-      OffsetCheckpointFile.Formatter)
+    val file = TestUtils.tempFile()
+    val logDirFailureChannel = new LogDirFailureChannel(10)
+    val checkpointFile = new CheckpointFile(file, OffsetCheckpointFile.CurrentVersion + 1,
+      OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
     checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
-    new OffsetCheckpointFile(checkpointFile.file).read()
+    new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index d004641..e8c08fe 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -16,6 +16,7 @@
   */
 package kafka.server.epoch
 
+import java.io.File
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Replica
@@ -30,6 +31,7 @@ import org.easymock.EasyMock._
 import org.junit.Assert._
 import org.junit.Test
 
+
 class OffsetsForLeaderEpochTest {
   private val config = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head
   private val time = new MockTime
@@ -46,14 +48,16 @@ class OffsetsForLeaderEpochTest {
     //Stubs
     val mockLog = createNiceMock(classOf[kafka.log.Log])
     val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
+    val logManager = createNiceMock(classOf[kafka.log.LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
-    replay(mockCache, mockLog)
+    expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
+    replay(mockCache, mockLog, logManager)
 
     // create a replica manager with 1 partition that has 1 replica
-    val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false),
+    val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
       QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(config.brokerId))
+      new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     val partition = replicaManager.getOrCreatePartition(tp)
     val leaderReplica = new Replica(config.brokerId, partition, time, 0, Some(mockLog))
     partition.addReplicaIfNotExists(leaderReplica)
@@ -68,10 +72,14 @@ class OffsetsForLeaderEpochTest {
 
   @Test
   def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
+    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
+    replay(logManager)
+
     //create a replica manager with 1 partition that has 0 replica
-    val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false),
+    val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
       QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(config.brokerId))
+      new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     replicaManager.getOrCreatePartition(tp)
 
     //Given
@@ -87,10 +95,14 @@ class OffsetsForLeaderEpochTest {
 
   @Test
   def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
+    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
+    replay(logManager)
+
     //create a replica manager with 0 partition
-    val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false),
+    val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
       QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
-      new MetadataCache(config.brokerId))
+      new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
 
     //Given
     val epochRequested: Integer = 5

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4976f52..d4f00a6 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -156,11 +156,12 @@ object TestUtils extends Logging {
     enableSsl: Boolean = false,
     enableSaslPlaintext: Boolean = false,
     enableSaslSsl: Boolean = false,
-    rackInfo: Map[Int, String] = Map()): Seq[Properties] = {
+    rackInfo: Map[Int, String] = Map(),
+    logDirCount: Int = 1): Seq[Properties] = {
     (0 until numConfigs).map { node =>
       createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
         interBrokerSecurityProtocol, trustStoreFile, saslProperties, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
-        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node))
+        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node), logDirCount = logDirCount)
     }
   }
 
@@ -205,7 +206,7 @@ object TestUtils extends Logging {
     enablePlaintext: Boolean = true,
     enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
     enableSsl: Boolean = false, sslPort: Int = RandomPort,
-    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None)
+    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None, logDirCount: Int = 1)
   : Properties = {
 
     def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
@@ -227,7 +228,12 @@ object TestUtils extends Logging {
     val props = new Properties
     if (nodeId >= 0) props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
     props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.LogDirProp, TestUtils.tempDir().getAbsolutePath)
+    if (logDirCount > 1) {
+      val logDirs = (1 to logDirCount).toList.map(i => TestUtils.tempDir().getAbsolutePath).mkString(",")
+      props.put(KafkaConfig.LogDirsProp, logDirs)
+    } else {
+      props.put(KafkaConfig.LogDirProp, TestUtils.tempDir().getAbsolutePath)
+    }
     props.put(KafkaConfig.ZkConnectProp, zkConnect)
     props.put(KafkaConfig.ZkConnectionTimeoutMsProp, "10000")
     props.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "1500")
@@ -1010,6 +1016,7 @@ object TestUtils extends Logging {
                        cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
                        time: MockTime = new MockTime()): LogManager = {
     new LogManager(logDirs = logDirs,
+                   initialOfflineDirs = Array.empty[File],
                    topicConfigs = Map(),
                    defaultConfig = defaultConfig,
                    cleanerConfig = cleanerConfig,
@@ -1022,7 +1029,8 @@ object TestUtils extends Logging {
                    scheduler = time.scheduler,
                    time = time,
                    brokerState = BrokerState(),
-                   brokerTopicStats = new BrokerTopicStats)
+                   brokerTopicStats = new BrokerTopicStats,
+                   logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
   }
 
   @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
@@ -1161,7 +1169,7 @@ object TestUtils extends Logging {
       servers.forall(server => topicPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
     // ensure that topic is removed from all cleaner offsets
     TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall { tp =>
-      val checkpoints = server.getLogManager().logDirs.map { logDir =>
+      val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
         new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
       }
       checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index f90ced3..37313b6 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -17,7 +17,54 @@
 
 <script><!--#include virtual="js/templateData.js" --></script>
 
-<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_11_1_0" href="#upgrade_11_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x or 0.11.0.0 to 0.11.1.0</a></h4>
+<p>Kafka 0.11.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
+    you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_1110_notable">notable changes in 0.11.1.0</a> before upgrading.
+</p>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
+        are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the current message format version currently in use. If you have
+        not overridden the message format previously, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing <code>inter.broker.protocol.version</code> and setting it to 0.11.1.
+    <li> Restart the brokers one by one for the new protocol version to take effect. </li>
+</ol>
+
+<p><b>Additional Upgrade Notes:</b></p>
+
+<ol>
+    <li>If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
+        with the new protocol by default.</li>
+    <li>Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
+        Similarly for the message format version.</li>
+</ol>
+
+<h5><a id="upgrade_1110_notable" href="#upgrade_1100_notable">Notable changes in 0.11.1.0</a></h5>
+<ul>
+    <li>If the <code>inter.broker.protocol.version</code> is 0.11.1 or later, a broker will now stay online to serve replicas
+        on live log directories even if there are offline log directories. A log directory may become offline due to IOException
+        caused by hardware failure. Users need to monitor the per-broker metric <code>offlineLogDirectoryCount</code> to check
+        whether there is offline log directory. </li>
+    <li>Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response
+        if the version of client's FetchRequest or ProducerRequest does not support KafkaStorageException. </li>
+</ul>
+
+<h5><a id="upgrade_1110_new_protocols" href="#upgrade_1110_new_protocols">New Protocol Versions</a></h5>
+<ul>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD">KIP-112</a>: LeaderAndIsrRequest v1 introduces a partition-level <code>is_new</code> field. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD">KIP-112</a>: UpdateMetadataRequest v4 introduces a partition-level <code>offline_replicas</code> field. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD">KIP-112</a>: MetadataResponse v5 introduces a partition-level <code>offline_replicas</code> field. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD">KIP-112</a>: ProduceResponse v4 introduces error code for KafkaStorageException. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD">KIP-112</a>: FetchResponse v6 introduces error code for KafkaStorageException. </li>
+</ul>
+
 
 <h4><a id="upgrade_1_0_0" href="#upgrade_1_0_0"</h4> Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2 or 0.11.0.0 to 1.0.0</a></h4>
 <p>1.0.0 is fully compatible with 0.11.0.0. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.
@@ -73,7 +120,7 @@ To upgrade from earlier versions, please review the <a href="#upgrade_11_0_0">0.
     Similarly for the message format version.</li>
   <li>It is also possible to enable the 0.11.0 message format on individual topics using the topic admin tool (<code>bin/kafka-topics.sh</code>)
     prior to updating the global setting <code>log.message.format.version</code>.</li>
-  <li>If  you are upgrading from a version prior to 0.10.0, it is NOT necessary to first update the message format to 0.10.0
+  <li>If you are upgrading from a version prior to 0.10.0, it is NOT necessary to first update the message format to 0.10.0
     before you switch to 0.11.0.</li>
 </ol>
 
@@ -98,7 +145,7 @@ To upgrade from earlier versions, please review the <a href="#upgrade_11_0_0">0.
         individual messages is only reduced by the overhead of the batch format. However, there are some subtle implications
         for message format conversion (see <a href="#upgrade_11_message_format">below</a> for more detail). Note also
         that while previously the broker would ensure that at least one message is returned in each fetch request (regardless of the
-        total and partition-level fetch sizes), the same behavior now applies to one message batch.</li> 
+        total and partition-level fetch sizes), the same behavior now applies to one message batch.</li>
     <li>GC log rotation is enabled by default, see KAFKA-3754 for details.</li>
     <li>Deprecated constructors of RecordMetadata, MetricName and Cluster classes have been removed.</li>
     <li>Added user headers support through a new Headers interface providing user headers read and write access.</li>
@@ -151,7 +198,7 @@ To upgrade from earlier versions, please review the <a href="#upgrade_11_0_0">0.
   <li>EoS in Kafka introduces new request APIs and modifies several existing ones. See
     <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-RPCProtocolSummary">KIP-98</a>
     for the full details</code></li>
-</ol>    
+</ol>
 
 <h5><a id="upgrade_11_message_format" href="#upgrade_11_message_format">Notes on the new message format in 0.11.0</a></h5>
 <p>The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer
@@ -182,7 +229,7 @@ To upgrade from earlier versions, please review the <a href="#upgrade_11_0_0">0.
   are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support
   the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion.
   Note that 0.11.0 consumers support backwards compability with brokers 0.10.0 brokers and upward, so it is possible to upgrade the
-  clients first before the brokers. 
+  clients first before the brokers.
 </p>
 
 <h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 7cdc180..eb844d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.api.PartitionStateInfo;
+import kafka.api.MetadataPartitionState;
 import kafka.api.Request;
 import kafka.server.KafkaServer;
 import kafka.server.MetadataCache;
@@ -225,13 +225,13 @@ public class IntegrationTestUtils {
             public boolean conditionMet() {
                 for (final KafkaServer server : servers) {
                     final MetadataCache metadataCache = server.apis().metadataCache();
-                    final Option<PartitionStateInfo> partitionInfo =
+                    final Option<MetadataPartitionState> partitionInfo =
                             metadataCache.getPartitionInfo(topic, partition);
                     if (partitionInfo.isEmpty()) {
                         return false;
                     }
-                    final PartitionStateInfo partitionStateInfo = partitionInfo.get();
-                    if (!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) {
+                    final MetadataPartitionState metadataPartitionState = partitionInfo.get();
+                    if (!Request.isValidBrokerId(metadataPartitionState.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) {
                         return false;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 1a46356..469f9cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -133,7 +133,7 @@ public class InternalTopicManagerTest {
         @Override
         public MetadataResponse fetchMetadata() {
             Node node = new Node(1, "host1", 1001);
-            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>());
+            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>(), new ArrayList<Node>());
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
             MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
                 Collections.singletonList(topicMetadata));

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/tests/kafkatest/services/kafka/config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py
index 1b81b43..462277f 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -24,7 +24,7 @@ class KafkaConfig(dict):
     DEFAULTS = {
         config_property.PORT: 9092,
         config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
-        config_property.LOG_DIRS: "/mnt/kafka-data-logs",
+        config_property.LOG_DIRS: "/mnt/kafka-data-logs-1,/mnt/kafka-data-logs-2",
         config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index a9ba40d..8b64d0e 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -44,6 +44,10 @@ MESSAGE_FORMAT_VERSION = "log.message.format.version"
 MESSAGE_TIMESTAMP_TYPE = "message.timestamp.type"
 THROTTLING_REPLICATION_RATE_LIMIT = "replication.quota.throttled.rate"
 
+LOG_FLUSH_INTERVAL_MESSAGE = "log.flush.interval.messages"
+REPLICA_HIGHWATERMARK_CHECKPOINT_INTERVAL_MS = "replica.high.watermark.checkpoint.interval.ms"
+LOG_ROLL_TIME_MS = "log.roll.ms"
+
 """
 From KafkaConfig.scala
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index b22b518..e941a3d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -45,7 +45,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     OPERATIONAL_LOG_INFO_DIR = os.path.join(OPERATIONAL_LOG_DIR, "info")
     OPERATIONAL_LOG_DEBUG_DIR = os.path.join(OPERATIONAL_LOG_DIR, "debug")
     # Kafka log segments etc go here
-    DATA_LOG_DIR = os.path.join(PERSISTENT_ROOT, "kafka-data-logs")
+    DATA_LOG_DIR_PREFIX = os.path.join(PERSISTENT_ROOT, "kafka-data-logs")
+    DATA_LOG_DIR_1 = "%s-1" % (DATA_LOG_DIR_PREFIX)
+    DATA_LOG_DIR_2 = "%s-2" % (DATA_LOG_DIR_PREFIX)
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
     # Kafka Authorizer
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
@@ -60,8 +62,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         "kafka_operational_logs_debug": {
             "path": OPERATIONAL_LOG_DEBUG_DIR,
             "collect_default": False},
-        "kafka_data": {
-            "path": DATA_LOG_DIR,
+        "kafka_data_1": {
+            "path": DATA_LOG_DIR_1,
+            "collect_default": False},
+        "kafka_data_2": {
+            "path": DATA_LOG_DIR_2,
             "collect_default": False}
     }
 
@@ -150,6 +155,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         else:
             self.minikdc = None
 
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
     def start(self, add_principals=""):
         self.open_port(self.security_protocol)
         self.open_port(self.interbroker_security_protocol)
@@ -262,7 +270,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         JmxMixin.clean_node(self, node)
         self.security_config.clean_node(node)
         node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+        node.account.ssh("sudo rm -rf /mnt/*", allow_fail=False)
 
     def create_topic(self, topic_cfg, node=None):
         """Run the admin tool create topic command.
@@ -444,7 +452,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                           len(messages))
         for node in self.nodes:
             # Grab all .log files in directories prefixed with this topic
-            files = node.account.ssh_capture("find %s -regex  '.*/%s-.*/[^/]*.log'" % (KafkaService.DATA_LOG_DIR, topic))
+            files = node.account.ssh_capture("find %s* -regex  '.*/%s-.*/[^/]*.log'" % (KafkaService.DATA_LOG_DIR_PREFIX, topic))
 
             # Check each data file to see if it contains the messages we want
             for log in files:
@@ -471,10 +479,45 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.stop_node(node, clean_shutdown)
         self.start_node(node)
 
+    def isr_idx_list(self, topic, partition=0):
+        """ Get in-sync replica list the given topic and partition.
+        """
+        self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition))
+        zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
+        partition_state = self.zk.query(zk_path)
+
+        if partition_state is None:
+            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
+
+        partition_state = json.loads(partition_state)
+        self.logger.info(partition_state)
+
+        isr_idx_list = partition_state["isr"]
+        self.logger.info("Isr for topic %s and partition %d is now: %s" % (topic, partition, isr_idx_list))
+        return isr_idx_list
+
+    def replicas(self, topic, partition=0):
+        """ Get the assigned replicas for the given topic and partition.
+        """
+        self.logger.debug("Querying zookeeper to find assigned replicas for topic %s and partition %d" % (topic, partition))
+        zk_path = "/brokers/topics/%s" % (topic)
+        assignemnt = self.zk.query(zk_path)
+
+        if assignemnt is None:
+            raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
+
+        assignemnt = json.loads(assignemnt)
+        self.logger.info(assignemnt)
+
+        replicas = assignemnt["partitions"][str(partition)]
+
+        self.logger.info("Assigned replicas for topic %s and partition %d is now: %s" % (topic, partition, replicas))
+        return [self.get_node(replica) for replica in replicas]
+
     def leader(self, topic, partition=0):
         """ Get the leader replica for the given topic and partition.
         """
-        self.logger.debug("Querying zookeeper to find leader replica for topic: \n%s" % (topic))
+        self.logger.debug("Querying zookeeper to find leader replica for topic %s and partition %d" % (topic, partition))
         zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
         partition_state = self.zk.query(zk_path)
 
@@ -541,7 +584,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.debug(output)
         return output
 
-    def bootstrap_servers(self, protocol='PLAINTEXT', validate=True):
+    def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
         This is the format expected by many config files.
@@ -552,7 +595,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         if validate and not port_mapping.open:
             raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
 
-        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes])
+        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes if node not in offline_nodes])
 
     def controller(self):
         """ Get the controller node

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 3cf3abd..6ba3e86 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -56,7 +56,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
-                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False):
+                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False, offline_nodes=[]):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There are
@@ -90,6 +90,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.stop_timeout_sec = stop_timeout_sec
         self.request_timeout_sec = request_timeout_sec
         self.enable_idempotence = enable_idempotence
+        self.offline_nodes = offline_nodes
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -184,7 +185,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
         cmd += self.impl.exec_cmd(node)
-        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol))
+        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol, True, self.offline_nodes))
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
         if self.throughput > 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/tests/kafkatest/tests/core/log_dir_failure_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py b/tests/kafkatest/tests/core/log_dir_failure_test.py
new file mode 100644
index 0000000..faa13c0
--- /dev/null
+++ b/tests/kafkatest/tests/core/log_dir_failure_test.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from kafkatest.services.kafka import config_property
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+
+def select_node(test, broker_type, topic):
+    """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
+    """
+    if broker_type == "leader":
+        node = test.kafka.leader(topic, partition=0)
+    elif broker_type == "follower":
+        leader = test.kafka.leader(topic, partition=0)
+        node = [replica for replica in test.kafka.replicas(topic, partition=0) if replica != leader][0]
+    elif broker_type == "controller":
+        node = test.kafka.controller()
+    else:
+        raise Exception("Unexpected broker type %s." % (broker_type))
+
+    return node
+
+
+class LogDirFailureTest(ProduceConsumeValidateTest):
+    """
+    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
+    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
+    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
+    ordering guarantees.
+
+    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
+    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
+
+    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
+    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
+    indicator that nothing is left to consume.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(LogDirFailureTest, self).__init__(test_context=test_context)
+
+        self.topic1 = "test_topic_1"
+        self.topic2 = "test_topic_2"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context,
+                                  num_nodes=3,
+                                  zk=self.zk,
+                                  topics={
+                                      self.topic1: {"partitions": 1, "replication-factor": 3, "configs": {"min.insync.replicas": 2}},
+                                      self.topic2: {"partitions": 1, "replication-factor": 3, "configs": {"min.insync.replicas": 1}}
+                                  },
+                                  # Set log.roll.ms to 3 seconds so that broker will detect disk error sooner when it creates log segment
+                                  # Otherwise broker will still be able to read/write the log file even if the log directory is inaccessible.
+                                  server_prop_overides=[
+                                      [config_property.LOG_FLUSH_INTERVAL_MESSAGE, "5"],
+                                      [config_property.REPLICA_HIGHWATERMARK_CHECKPOINT_INTERVAL_MS, "60000"],
+                                      [config_property.LOG_ROLL_TIME_MS, "3000"]
+                                  ])
+
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
+
+    @cluster(num_nodes=9)
+    @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"])
+    def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type):
+        """Replication tests.
+        These tests verify that replication provides simple durability guarantees by checking that data acked by
+        brokers is still available for consumption in the face of various failure scenarios.
+
+        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
+               and another topic with partitions=3, replication-factor=3, and min.insync.replicas=1
+        
+            - Produce messages in the background
+            - Consume messages in the background
+            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
+            - When done driving failures, stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.start()
+
+        try:
+            # Initialize producer/consumer for topic1
+            self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic1,
+                                               throughput=self.producer_throughput)
+            self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic1, group_id="test-consumer-group-1",
+                                            new_consumer=False, consumer_timeout_ms=60000, message_validator=is_int)
+            self.start_producer_and_consumer()
+
+            # Get a replica of the partition of topic1 and make its first log directory offline by changing the log dir's permission.
+            # We assume that partition of topic1 is created in the first log directory of respective brokers.
+            broker_node = select_node(self, broker_type, self.topic1)
+            broker_idx = self.kafka.idx(broker_node)
+            assert broker_idx in self.kafka.isr_idx_list(self.topic1), \
+                   "Broker %d should be in isr set %s" % (broker_idx, str(self.kafka.isr_idx_list(self.topic1)))
+
+            self.logger.debug("Making log dir %s inaccessible" % (KafkaService.DATA_LOG_DIR_1))
+            cmd = "chmod a-w %s -R" % (KafkaService.DATA_LOG_DIR_1)
+            broker_node.account.ssh(cmd, allow_fail=False)
+
+            if bounce_broker:
+                self.kafka.restart_node(broker_node, clean_shutdown=True)
+
+            # Verify the following:
+            # 1) The broker with offline log directory is not the leader of the partition of topic1
+            # 2) The broker with offline log directory is not in the ISR
+            # 3) The broker with offline log directory is still online
+            # 4) Messages can still be produced and consumed from topic1
+            wait_until(lambda: self.kafka.leader(self.topic1, partition=0) != broker_node,
+                       timeout_sec=60,
+                       err_msg="Broker %d should not be leader of topic %s and partition 0" % (broker_idx, self.topic1))
+            assert self.kafka.alive(broker_node), "Broker %d should be still online" % (broker_idx)
+            wait_until(lambda: broker_idx not in self.kafka.isr_idx_list(self.topic1),
+                       timeout_sec=60,
+                       err_msg="Broker %d should not be in isr set %s" % (broker_idx, str(self.kafka.isr_idx_list(self.topic1))))
+
+            self.stop_producer_and_consumer()
+            self.validate()
+
+            # Shutdown all other brokers so that the broker with offline log dir is the only online broker
+            offline_nodes = []
+            for node in self.kafka.nodes:
+                if broker_node != node:
+                    offline_nodes.append(node)
+                    self.logger.debug("Hard shutdown broker %d" % (self.kafka.idx(node)))
+                    self.kafka.stop_node(node)
+
+            # Verify the following:
+            # 1) The broker with offline directory is the only in-sync broker of the partition of topic2
+            # 2) Messages can still be produced and consumed from topic2
+            self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic2,
+                                               throughput=self.producer_throughput, offline_nodes=offline_nodes)
+            self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic2, group_id="test-consumer-group-2",
+                                            new_consumer=False, consumer_timeout_ms=60000, message_validator=is_int)
+            self.start_producer_and_consumer()
+
+            assert self.kafka.isr_idx_list(self.topic2) == [broker_idx], \
+                   "In-sync replicas of topic %s and partition 0 should be %s" % (self.topic2, str([broker_idx]))
+
+            self.stop_producer_and_consumer()
+            self.validate()
+
+        except BaseException as e:
+            for s in self.test_context.services:
+                self.mark_for_collect(s)
+            raise

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/tests/kafkatest/tests/core/transactions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index 9ccb259..d8f9e5c 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -219,7 +219,8 @@ class TransactionsTest(Test):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
-        self.kafka.logs["kafka_data"]["collect_default"] = True
+        self.kafka.logs["kafka_data_1"]["collect_default"] = True
+        self.kafka.logs["kafka_data_2"]["collect_default"] = True
         self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True
         self.kafka.start()
         input_messages = self.seed_messages()


Mime
View raw message