kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] kafka git commit: KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)
Date Sun, 03 Sep 2017 06:21:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b2a328daf -> adefc8ea0


http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index dadd002..ce16971 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,6 +12,9 @@
   */
 package kafka.admin
 
+import java.util.Collections
+import java.util.Properties
+
 import kafka.admin.ReassignPartitionsCommand._
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -22,14 +25,24 @@ import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
+import org.apache.kafka.common.TopicPartitionReplica
+
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.Seq
+import scala.util.Random
+
+import java.io.File
 
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   val partitionId = 0
   var servers: Seq[KafkaServer] = null
   val topicName = "my-topic"
   val delayMs = 1000
+  var adminClient: JAdminClient = null
+
   def zkUpdateDelay(): Unit = Thread.sleep(delayMs)
 
   @Before
@@ -38,12 +51,29 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
   }
 
   def startBrokers(brokerIds: Seq[Int]) {
-    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect))
+    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect, logDirCount = 3))
       .map(c => createServer(KafkaConfig.fromProps(c)))
   }
 
+  def createAdminClient(servers: Seq[KafkaServer]): JAdminClient = {
+    val props = new Properties()
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+    JAdminClient.create(props)
+  }
+
+  def getRandomLogDirAssignment(brokerId: Int): String = {
+    val server = servers.find(_.config.brokerId == brokerId).get
+    val logDirs = server.config.logDirs
+    new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
+  }
+
   @After
   override def tearDown() {
+    if (adminClient != null) {
+      adminClient.close()
+      adminClient = null
+    }
     TestUtils.shutdownServers(servers)
     super.tearDown()
   }
@@ -52,16 +82,22 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
   def shouldMoveSinglePartition(): Unit = {
     //Given a single replica on server 100
     startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
     val partition = 0
+    // Get a random log directory on broker 101
+    val expectedLogDir = getRandomLogDirAssignment(101)
     createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
 
     //When we move the replica on 100 to broker 101
-    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["$expectedLogDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replica should be on 101
     assertEquals(Seq(101), zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition))
+    // The replica should be in the expected log directory on broker 101
+    val replica = new TopicPartitionReplica(topicName, 0, 101)
+    assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -69,6 +105,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging
{
     //Given partitions on 2 of 3 brokers
     val brokers = Array(100, 101, 102)
     startBrokers(brokers)
+    adminClient = createAdminClient(servers)
+    // Get a random log directory on broker 102
+    val expectedLogDir = getRandomLogDirAssignment(102)
     createTopic(zkUtils, topicName, Map(
       0 -> Seq(100, 101),
       1 -> Seq(100, 101),
@@ -77,12 +116,19 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
 
     //When rebalancing
     val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment),
NoThrottle)
+    // Find a partition on broker 102
+    val partition = newAssignment.find { case (replica, brokerIds) => brokerIds.contains(102)}.get._1.partition
+    val replica = new TopicPartitionReplica(topicName, partition, 102)
+    val newReplicaAssignment = Map(replica -> expectedLogDir)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient),
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, newReplicaAssignment),
NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replicas should span all three brokers
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
     assertEquals(Seq(100, 101, 102), actual.values.flatten.toSeq.distinct.sorted)
+    // The replica should be in the expected log directory on broker 102
+    assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -98,7 +144,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging
{
 
     //When rebalancing
     val newAssignment = generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment),
NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then replicas should only span the first two brokers
@@ -111,6 +158,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     //Given partitions on 3 of 3 brokers
     val brokers = Array(100, 101, 102)
     startBrokers(brokers)
+    adminClient = createAdminClient(servers)
     createTopic(zkUtils, "topic1", Map(
       0 -> Seq(100, 101),
       1 -> Seq(101, 102),
@@ -125,11 +173,20 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     val proposed: Map[TopicAndPartition, Seq[Int]] = Map(
       TopicAndPartition("topic1", 0) -> Seq(100, 102),
       TopicAndPartition("topic1", 2) -> Seq(100, 102),
+      TopicAndPartition("topic2", 1) -> Seq(101, 100),
       TopicAndPartition("topic2", 2) -> Seq(100, 102)
     )
 
+    val replica1 = new TopicPartitionReplica("topic1", 0, 102)
+    val replica2 = new TopicPartitionReplica("topic2", 1, 100)
+    val proposedReplicaAssignment: Map[TopicPartitionReplica, String] = Map(
+      replica1 -> getRandomLogDirAssignment(102),
+      replica2 -> getRandomLogDirAssignment(100)
+    )
+
     //When rebalancing
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed),
NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient),
+      ReassignPartitionsCommand.formatAsReassignmentJson(proposed, proposedReplicaAssignment),
NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the proposed changes should have been made
@@ -138,8 +195,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     assertEquals(Seq(101, 102), actual("topic1")(1))
     assertEquals(Seq(100, 102), actual("topic1")(2))//changed
     assertEquals(Seq(100, 101), actual("topic2")(0))
-    assertEquals(Seq(101, 102), actual("topic2")(1))
+    assertEquals(Seq(101, 100), actual("topic2")(1))//changed
     assertEquals(Seq(100, 102), actual("topic2")(2))//changed
+
+    // The replicas should be in the expected log directories
+    val replicaDirs = adminClient.describeReplicaLogDir(List(replica1, replica2).asJavaCollection).all().get()
+    assertEquals(proposedReplicaAssignment(replica1), replicaDirs.get(replica1).getCurrentReplicaLogDir)
+    assertEquals(proposedReplicaAssignment(replica2), replicaDirs.get(replica2).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -164,7 +226,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
 
     val start = System.currentTimeMillis()
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment),
initialThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), initialThrottle)
 
     //Check throttle config. Should be throttling replica 0 on 100 and 102 only.
     checkThrottleConfigAddedToZK(initialThrottle.value, servers, topicName, "0:100,0:101",
"0:102")
@@ -216,7 +279,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
       TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
       TopicAndPartition("topic2", 2) -> Seq(103, 104)  //didn't move
     )
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment),
Throttle(throttle))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(throttle))
 
     //Check throttle config. Should be throttling specific replicas for each topic.
     checkThrottleConfigAddedToZK(throttle, servers, "topic1",
@@ -245,13 +309,14 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     //Start rebalance
     val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment),
Throttle(initialThrottle))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(initialThrottle))
 
     //Check throttle config
     checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
 
     //Ensure that running Verify, whilst the command is executing, should have no effect
-    verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    verifyAssignment(zkUtils, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment,
Map.empty))
 
     //Check throttle config again
     checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -259,7 +324,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     //Now re-run the same assignment with a larger throttle, which should only act to increase
the throttle and make progress
     val newThrottle = initialThrottle * 1000
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment),
Throttle(newThrottle))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(newThrottle))
 
     //Check throttle was changed
     checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -268,7 +334,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     waitForReassignmentToComplete()
 
     //Verify should remove the throttle
-    verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    verifyAssignment(zkUtils, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment,
Map.empty))
 
     //Check removed
     checkThrottleConfigRemovedFromZK(topicName, servers)
@@ -286,7 +352,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
 
     //When we execute an assignment that includes an invalid partition (1:101 in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -297,7 +363,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
 
     //When we execute an assignment that specifies an empty replica list (0: empty list in
this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -308,7 +374,46 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
 
     //When we execute an assignment that specifies an invalid brokerID (102: invalid broker
ID in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101,
102]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
+  }
+
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedHasInvalidLogDir() {
+    // Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    // When we execute an assignment that specifies an invalid log directory
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["invalidDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+  }
+
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedMoveReplicaWithinBroker() {
+    // Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    val logDir = getRandomLogDirAssignment(100)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    // When we execute an assignment that specifies log directory for an existing replica
on the broker
+    // This test can be removed after KIP-113 is fully implemented, which allows us to change
log directory of existing replicas on a broker
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[100],"log_dirs":["$logDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+  }
+
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedHasInconsistentReplicasAndLogDirs() {
+    // Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    val logDir = getRandomLogDirAssignment(100)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    // When we execute an assignment whose length of replicas doesn't match that of replicas
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["$logDir",
"$logDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
   }
 
   @Test
@@ -333,7 +438,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     )
 
     //When we run a throttled reassignment
-    new ReassignPartitionsCommand(zkUtils, move).reassignPartitions(throttle)
+    new ReassignPartitionsCommand(zkUtils, None, move).reassignPartitions(throttle)
 
     waitForReassignmentToComplete()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index c75c28a..09c9ea8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -40,7 +40,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindMovingReplicas() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given partition 0 moves from broker 100 -> 102. Partition 1 does not move.
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
@@ -61,7 +61,7 @@ class ReassignPartitionsCommandTest extends Logging {
 
   @Test
   def shouldFindMovingReplicasWhenProposedIsSubsetOfExisting() {
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given we have more existing partitions than we are proposing
     val existingSuperset = Map(
@@ -94,7 +94,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindMovingReplicasMultiplePartitions() {
     val control = TopicAndPartition("topic1", 2) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given partitions 0 & 1 moves from broker 100 -> 102. Partition 2 does not move.
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic1",
1) -> Seq(100, 101), control)
@@ -117,7 +117,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindMovingReplicasMultipleTopics() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given topics 1 -> move from broker 100 -> 102, topics 2 -> move from broker
101 -> 100
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic2",
0) -> Seq(101, 102), control)
@@ -146,7 +146,7 @@ class ReassignPartitionsCommandTest extends Logging {
 
   @Test
   def shouldFindMovingReplicasMultipleTopicsAndPartitions() {
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given
     val existing = Map(
@@ -186,7 +186,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindTwoMovingReplicasInSamePartition() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given partition 0 has 2 moves from broker 102 -> 104 & 103 -> 105
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101, 102, 103), control)
@@ -209,7 +209,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldNotOverwriteEntityConfigsWhenUpdatingThrottledReplicas(): Unit = {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
     val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), control)
 
@@ -243,7 +243,7 @@ class ReassignPartitionsCommandTest extends Logging {
     val zk = stubZK(existing)
     val admin = createMock(classOf[AdminUtilities])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.fetchEntityConfig(is(zk), anyString(), anyString())).andStubReturn(new Properties)
     expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
     replay(admin)
@@ -269,7 +269,7 @@ class ReassignPartitionsCommandTest extends Logging {
     val zk = stubZK(existing)
     val admin = createMock(classOf[AdminUtilities])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
 
     //Expect the existing broker config to be changed from 10/100 to 1000
@@ -303,7 +303,7 @@ class ReassignPartitionsCommandTest extends Logging {
     val zk = stubZK(existing)
     val admin = createMock(classOf[AdminUtilities])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
 
     //Given there is some existing config

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index dd59e60..9794b1a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -275,7 +275,7 @@ class LogManagerTest {
   @Test
   def testRecoveryDirectoryMappingWithRelativeDirectory() {
     logManager.shutdown()
-    logDir = new File("data" + File.separator + logDir.getName)
+    logDir = new File("data" + File.separator + logDir.getName).getAbsoluteFile
     logDir.mkdirs()
     logDir.deleteOnExit()
     logManager = createLogManager()

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
new file mode 100644
index 0000000..6e22444
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
@@ -0,0 +1,83 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils._
+import java.io.File
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AlterReplicaDirRequestTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+  override def logDirCount: Int = 5
+
+  val topic = "topic"
+
+  @Test
+  def testAlterReplicaDirRequestBeforeTopicCreation() {
+    val partitionNum = 5
+    val logDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+    val partitionDirs = (0 until partitionNum).map(partition => new TopicPartition(topic,
partition) -> logDir).toMap
+    val alterReplicaDirResponse = sendAlterReplicaDirRequest(partitionDirs)
+
+    // The response should show error REPLICA_NOT_AVAILABLE for all partitions
+    (0 until partitionNum).foreach { partition =>
+      val tp = new TopicPartition(topic, partition)
+      assertEquals(Errors.REPLICA_NOT_AVAILABLE, alterReplicaDirResponse.responses().get(tp))
+      assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+    }
+
+    TestUtils.createTopic(zkUtils, topic, partitionNum, 1, servers)
+    (0 until partitionNum).foreach { partition =>
+      assertEquals(logDir, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
+    }
+  }
+
+  @Test
+  def testAlterReplicaDirRequestErrorCode(): Unit = {
+    val validDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+    val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
+    servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
+    TestUtils.createTopic(zkUtils, topic, 3, 1, servers)
+
+    val partitionDirs = mutable.Map.empty[TopicPartition, String]
+    partitionDirs.put(new TopicPartition(topic, 0), "invalidDir")
+    partitionDirs.put(new TopicPartition(topic, 1), validDir)
+    partitionDirs.put(new TopicPartition(topic, 2), offlineDir)
+
+    val alterReplicaDirResponse = sendAlterReplicaDirRequest(partitionDirs.toMap)
+    assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse.responses().get(new TopicPartition(topic,
0)))
+    assertEquals(Errors.NONE, alterReplicaDirResponse.responses().get(new TopicPartition(topic,
1)))
+    assertEquals(Errors.KAFKA_STORAGE_ERROR, alterReplicaDirResponse.responses().get(new
TopicPartition(topic, 2)))
+  }
+
+  private def sendAlterReplicaDirRequest(partitionDirs: Map[TopicPartition, String], socketServer:
SocketServer = controllerSocketServer): AlterReplicaDirResponse = {
+    val request = new AlterReplicaDirRequest.Builder(partitionDirs.asJava).build()
+    val response = connectAndSend(request, ApiKeys.ALTER_REPLICA_DIR, socketServer)
+    AlterReplicaDirResponse.parse(response, request.version)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index fffe3a8..a2ff35e 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -36,6 +36,8 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
   // If required, set number of brokers
   protected def numBrokers: Int = 3
 
+  protected def logDirCount: Int = 1
+
   // If required, override properties by mutating the passed Properties object
   protected def propertyOverrides(properties: Properties) {}
 
@@ -43,7 +45,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = true,
       interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount
= logDirCount)
     props.foreach(propertyOverrides)
     props.map(KafkaConfig.fromProps)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
new file mode 100644
index 0000000..353c180
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -0,0 +1,64 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.junit.Assert._
+import org.junit.Test
+import java.io.File
+
+class DescribeLogDirsRequestTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+  override def logDirCount: Int = 2
+
+  val topic = "topic"
+  val partitionNum = 2
+  val tp0 = new TopicPartition(topic, 0)
+  val tp1 = new TopicPartition(topic, 1)
+
+  @Test
+  def testDescribeLogDirsRequest(): Unit = {
+    val onlineDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+    val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
+    servers.head.replicaManager.handleLogDirFailure(offlineDir)
+    TestUtils.createTopic(zkUtils, topic, partitionNum, 1, servers)
+    TestUtils.produceMessages(servers, topic, 10)
+
+    val request = new DescribeLogDirsRequest.Builder(null).build()
+    val response = connectAndSend(request, ApiKeys.DESCRIBE_LOG_DIRS, controllerSocketServer)
+    val logDirInfos = DescribeLogDirsResponse.parse(response, request.version).logDirInfos()
+
+    assertEquals(logDirCount, logDirInfos.size())
+    assertEquals(Errors.KAFKA_STORAGE_ERROR, logDirInfos.get(offlineDir).error)
+    assertEquals(0, logDirInfos.get(offlineDir).replicaInfos.size())
+
+    assertEquals(Errors.NONE, logDirInfos.get(onlineDir).error)
+    val replicaInfo0 = logDirInfos.get(onlineDir).replicaInfos.get(tp0)
+    val replicaInfo1 = logDirInfos.get(onlineDir).replicaInfos.get(tp1)
+    assertEquals(servers.head.logManager.getLog(tp0).get.size, replicaInfo0.size)
+    assertEquals(servers.head.logManager.getLog(tp1).get.size, replicaInfo1.size)
+    assertTrue(servers.head.logManager.getLog(tp0).get.logEndOffset > 0)
+    assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0), replicaInfo0.offsetLag)
+    assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp1), replicaInfo1.offsetLag)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index aadb4d2..aec68e2 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -49,6 +49,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private val topic = "topic-1"
   private val numPartitions = 1
   private val tp = new TopicPartition(topic, 0)
+  private val logDir = "logDir"
   private val unthrottledClientId = "unthrottled-client"
   private val brokerId: Integer = 0
   private var leaderNode: KafkaServer = null
@@ -290,6 +291,12 @@ class RequestQuotaTest extends BaseRequestTest {
                 new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
               ))), true)
 
+        case ApiKeys.ALTER_REPLICA_DIR =>
+          new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir))
+
+        case ApiKeys.DESCRIBE_LOG_DIRS =>
+          new DescribeLogDirsRequest.Builder(Collections.singleton(tp))
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -381,6 +388,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsResponse(response).throttleTimeMs
       case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs
+      case ApiKeys.ALTER_REPLICA_DIR => new AlterReplicaDirResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2a08311..a52c83c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -197,19 +197,23 @@ object TestUtils extends Logging {
     *
     * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol`
will be enabled.
     */
-  def createBrokerConfig(nodeId: Int, zkConnect: String,
-    enableControlledShutdown: Boolean = true,
-    enableDeleteTopic: Boolean = false,
-    port: Int = RandomPort,
-    interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
-    trustStoreFile: Option[File] = None,
-    saslProperties: Option[Properties] = None,
-    enablePlaintext: Boolean = true,
-    enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
-    enableSsl: Boolean = false, sslPort: Int = RandomPort,
-    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] =
None, logDirCount: Int = 1)
-  : Properties = {
-
+  def createBrokerConfig(nodeId: Int,
+                         zkConnect: String,
+                         enableControlledShutdown: Boolean = true,
+                         enableDeleteTopic: Boolean = false,
+                         port: Int = RandomPort,
+                         interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
+                         trustStoreFile: Option[File] = None,
+                         saslProperties: Option[Properties] = None,
+                         enablePlaintext: Boolean = true,
+                         enableSaslPlaintext: Boolean = false,
+                         saslPlaintextPort: Int = RandomPort,
+                         enableSsl: Boolean = false,
+                         sslPort: Int = RandomPort,
+                         enableSaslSsl: Boolean = false,
+                         saslSslPort: Int = RandomPort,
+                         rack: Option[String] = None,
+                         logDirCount: Int = 1): Properties = {
     def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_
== protocol)
 
     val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]()


Mime
View raw message