kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: KAFKA-1298 (followup); Controlled shutdown tool doesn't seem to work out of the box; patched by Sriharsha Chintalapani; reviewed by Jun Rao
Date Tue, 10 Jun 2014 17:35:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6d1992274 -> b04a3be54


KAFKA-1298 (followup); Controlled shutdown tool doesn't seem to work out of the box;  patched
by Sriharsha Chintalapani; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b04a3be5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b04a3be5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b04a3be5

Branch: refs/heads/trunk
Commit: b04a3be54e872599a1e3a0fda488de5f19ce7c24
Parents: 6d19922
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Tue Jun 10 10:35:32 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jun 10 10:35:32 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 34 ++++++++++---------
 .../kafka/api/ProducerFailureHandlingTest.scala |  6 ++--
 .../kafka/api/ProducerSendTest.scala            |  6 ++--
 .../unit/kafka/admin/AddPartitionsTest.scala    | 10 +++---
 .../test/scala/unit/kafka/admin/AdminTest.scala | 14 ++++----
 .../unit/kafka/admin/DeleteTopicTest.scala      |  4 +--
 .../kafka/integration/RollingBounceTest.scala   |  8 ++---
 .../unit/kafka/producer/ProducerTest.scala      |  8 ++---
 .../unit/kafka/producer/SyncProducerTest.scala  |  3 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  6 ++--
 .../unit/kafka/server/LogRecoveryTest.scala     | 10 +++---
 .../unit/kafka/server/ReplicaFetchTest.scala    |  4 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala | 35 +++++++++++---------
 13 files changed, 75 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e776423..d0cf5f1 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -249,22 +249,24 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
           // Move leadership serially to relinquish lock.
           inLock(controllerContext.controllerLock) {
             controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch
=>
-              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id && replicationFactor
> 1) {
-                // If the broker leads the topic partition, transition the leader and update
isr. Updates zk and
-                // notifies all affected brokers
-                partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                  controlledShutdownPartitionLeaderSelector)
-              } else {
-                // Stop the replica first. The state change below initiates ZK changes which
should take some time
-                // before which the stop replica request should be completed (in most cases)
-                brokerRequestBatch.newBatch()
-                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                  topicAndPartition.partition, deletePartition = false)
-                brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
-
-                // If the broker is a follower, updates the isr in ZK and notifies the current
leader
-                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                  topicAndPartition.partition, id)), OfflineReplica)
+              if (replicationFactor > 1) {
+                if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+                  // If the broker leads the topic partition, transition the leader and update
isr. Updates zk and
+                  // notifies all affected brokers
+                  partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
+                    controlledShutdownPartitionLeaderSelector)
+                } else {
+                  // Stop the replica first. The state change below initiates ZK changes
which should take some time
+                  // before which the stop replica request should be completed (in most cases)
+                  brokerRequestBatch.newBatch()
+                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+                    topicAndPartition.partition, deletePartition = false)
+                  brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+
+                  // If the broker is a follower, updates the isr in ZK and notifies the
current leader
+                  replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                    topicAndPartition.partition, id)), OfflineReplica)
+                }
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index cd4ca2f..b9405cf 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -50,8 +50,8 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
   private var producer3: KafkaProducer = null
   private var producer4: KafkaProducer = null
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
   props1.put("auto.create.topics.enable", "false")
   props2.put("auto.create.topics.enable", "false")
   private val config1 = new KafkaConfig(props1)
@@ -333,4 +333,4 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
       producer.close
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 3c2bf36..34a7db4 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -45,8 +45,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
   props1.put("num.partitions", "4")
   props2.put("num.partitions", "4")
   private val config1 = new KafkaConfig(props1)
@@ -255,4 +255,4 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index fcd5eee..1bf2667 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -37,10 +37,10 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   val port3 = TestUtils.choosePort()
   val port4 = TestUtils.choosePort()
 
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false)
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false)
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   var brokers: Seq[Broker] = Seq.empty[Broker]
@@ -205,4 +205,4 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
     assertEquals(partition6DataForTopic3.replicas(2).id, 2)
     assertEquals(partition6DataForTopic3.replicas(3).id, 3)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 4f6ddca..e289798 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new
KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new
KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new
KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new
KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new
KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new
KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
   def testReassigningNonExistingPartition() {
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new
KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new
KafkaConfig(b)))
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition
-> newReplicas))
     reassignPartitionsCommand.reassignPartitions
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new
KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new
KafkaConfig(b)))
 
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
@@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val partition = 1
     val preferredReplica = 0
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
@@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val topic = "test"
     val partition = 1
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment,
servers = servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 1b3c04e..5d3c57a 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -101,7 +101,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
     val topicAndPartition = TopicAndPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(4)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
@@ -258,7 +258,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(3)
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index 5eee08a..eab4b5f 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -35,15 +35,11 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness
{
   val port3 = TestUtils.choosePort()
   val port4 = TestUtils.choosePort()
 
-  val enableShutdown = true
+  // controlled.shutdown.enable is true by default
   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  configProps1.put("controlled.shutdown.enable", "true")
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  configProps2.put("controlled.shutdown.enable", "true")
   val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  configProps3.put("controlled.shutdown.enable", "true")
   val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
-  configProps4.put("controlled.shutdown.enable", "true")
   configProps4.put("controlled.shutdown.retry.backoff.ms", "100")
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
@@ -111,4 +107,4 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness
{
     // Start the server back up again
     servers(prevLeader).startup()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index b61c0b8..dd71d81 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -48,10 +48,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
   private var servers = List.empty[KafkaServer]
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
   props1.put("num.partitions", "4")
   private val config1 = new KafkaConfig(props1)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
   props2.put("num.partitions", "4")
   private val config2 = new KafkaConfig(props2)
 
@@ -314,7 +314,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     // make sure we don't wait fewer than timeoutMs
     assertTrue((t2-t1) >= timeoutMs)
   }
-  
+
   @Test
   def testSendNullMessage() {
     val producer = TestUtils.createProducer[String, String](
@@ -332,7 +332,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
         "Topic new-topic not created after timeout",
         waitTime = zookeeper.tickTime)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0)
-    
+
       producer.send(new KeyedMessage[String, String]("new-topic", "key", null))
     } finally {
       producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 0dec9ec..24deea0 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -31,7 +31,8 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head))
+  // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down
request handler pool.
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1, false).head))
   val zookeeperConnect = TestZKUtils.zookeeperConnect
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 25dffcf..c2ba07c 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -34,8 +34,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port1 = TestUtils.choosePort()
   val port2 = TestUtils.choosePort()
 
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   var staleControllerEpochDetected = false
@@ -146,4 +146,4 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
       case _ => false
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index b349fce..0ec120a 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -31,7 +31,7 @@ import org.junit.Assert._
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+  val configs = TestUtils.createBrokerConfigs(2, false).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 5000L
     override val replicaLagMaxMessages = 10L
     override val replicaFetchWaitMaxMs = 1000
@@ -52,7 +52,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0),
ReplicaManager.HighWatermarkFilename))
   var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0),
ReplicaManager.HighWatermarkFilename))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
-  
+
   override def setUp() {
     super.setUp()
 
@@ -131,7 +131,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     sendMessages(1)
     hw += 1
-      
+
     // give some time for follower 1 to record leader HW of 60
     TestUtils.waitUntilTrue(() =>
       server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
@@ -162,7 +162,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     sendMessages(2)
     var hw = 2L
-    
+
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
       server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
@@ -188,7 +188,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     sendMessages(2)
     hw += 2
-    
+
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
       server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw,

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 3e0bc18..da4bafc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -27,7 +27,7 @@ import junit.framework.Assert._
 import kafka.common._
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
-  val props = createBrokerConfigs(2)
+  val props = createBrokerConfigs(2,false)
   val configs = props.map(p => new KafkaConfig(p))
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
@@ -73,4 +73,4 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
     }
     waitUntilTrue(logsMatch, "Broker logs should be identical")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b04a3be5/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4da0f2c..12f8045 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
  * Utility functions to help with testing
  */
 object TestUtils extends Logging {
-  
+
   val IoTmpDir = System.getProperty("java.io.tmpdir")
 
   val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
@@ -127,9 +127,10 @@ object TestUtils extends Logging {
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfigs(numConfigs: Int): List[Properties] = {
+  def createBrokerConfigs(numConfigs: Int,
+    enableControlledShutdown: Boolean = true): List[Properties] = {
     for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-    yield createBrokerConfig(node, port)
+    yield createBrokerConfig(node, port, enableControlledShutdown)
   }
 
   def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
@@ -139,7 +140,8 @@ object TestUtils extends Logging {
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = {
+  def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
+    enableControlledShutdown: Boolean = true): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
     props.put("host.name", "localhost")
@@ -147,6 +149,7 @@ object TestUtils extends Logging {
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
     props.put("replica.socket.timeout.ms", "1500")
+    props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props
   }
 
@@ -340,7 +343,7 @@ object TestUtils extends Logging {
    * Create a producer with a few pre-configured properties.
    * If certain properties need to be overridden, they can be provided in producerProps.
    */
-  def createProducer[K, V](brokerList: String, 
+  def createProducer[K, V](brokerList: String,
                            encoder: String = classOf[DefaultEncoder].getName,
                            keyEncoder: String = classOf[DefaultEncoder].getName,
                            partitioner: String = classOf[DefaultPartitioner].getName,
@@ -445,9 +448,9 @@ object TestUtils extends Logging {
   /**
    * Create a wired format request based on simple basic information
    */
-  def produceRequest(topic: String, 
-                     partition: Int, 
-                     message: ByteBufferMessageSet, 
+  def produceRequest(topic: String,
+                     partition: Int,
+                     message: ByteBufferMessageSet,
                      acks: Int = SyncProducerConfig.DefaultRequiredAcks,
                      timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
                      correlationId: Int = 0,
@@ -455,10 +458,10 @@ object TestUtils extends Logging {
     produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId,
clientId)
   }
 
-  def produceRequestWithAcks(topics: Seq[String], 
-                             partitions: Seq[Int], 
-                             message: ByteBufferMessageSet, 
-                             acks: Int = SyncProducerConfig.DefaultRequiredAcks, 
+  def produceRequestWithAcks(topics: Seq[String],
+                             partitions: Seq[Int],
+                             message: ByteBufferMessageSet,
+                             acks: Int = SyncProducerConfig.DefaultRequiredAcks,
                              timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
                              correlationId: Int = 0,
                              clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest
= {
@@ -540,7 +543,7 @@ object TestUtils extends Logging {
 
     return leader
   }
-  
+
   /**
    * Execute the given block. If it throws an assert error, retry. Repeat
    * until no error is thrown or the time limit ellapses
@@ -554,7 +557,7 @@ object TestUtils extends Logging {
         return
       } catch {
         case e: AssertionFailedError =>
-          val ellapsed = System.currentTimeMillis - startTime 
+          val ellapsed = System.currentTimeMillis - startTime
           if(ellapsed > maxWaitMs) {
             throw e
           } else {
@@ -631,7 +634,7 @@ object TestUtils extends Logging {
 
     leader
   }
-  
+
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
     val file = new RandomAccessFile(fileName, "rw")
     file.seek(position)
@@ -639,7 +642,7 @@ object TestUtils extends Logging {
       file.writeByte(random.nextInt(255))
     file.close()
   }
-  
+
   def appendNonsenseToFile(fileName: File, size: Int) {
     val file = new FileOutputStream(fileName, true)
     for(i <- 0 until size)


Mime
View raw message