kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 3.0 updated: KAFKA-13165: Validate KRaft node id, process role and quorum voters (#11179)
Date Mon, 09 Aug 2021 20:02:03 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 385eb29  KAFKA-13165: Validate KRaft node id, process role and quorum voters (#11179)
385eb29 is described below

commit 385eb296b87c2afedc557a8b2d479b031c375308
Author: Ryan Dielhenn <rdielhenn@confluent.io>
AuthorDate: Mon Aug 9 12:57:23 2021 -0700

    KAFKA-13165: Validate KRaft node id, process role and quorum voters (#11179)
    
    Validate that KRaft controllers are members of the KRaft quorum, and non-controllers are
not.
    This validation assumes that controllers and brokers have the same ID only when they are
    co-located.
    
    Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@gmail.com>,
Luke Chen <showuon@gmail.com>
---
 config/kraft/broker.properties                     |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 21 ++++--
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |  5 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala    | 77 +++++++++++++++++++++-
 .../scala/unit/kafka/raft/RaftManagerTest.scala    | 32 ++++++---
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  1 +
 .../unit/kafka/server/ControllerApisTest.scala     |  1 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 12 +++-
 .../unit/kafka/server/KafkaRaftServerTest.scala    |  7 ++
 .../test/scala/unit/kafka/server/ServerTest.scala  |  1 +
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  1 +
 12 files changed, 141 insertions(+), 21 deletions(-)

diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties
index 1b71803..dfbd6ec 100644
--- a/config/kraft/broker.properties
+++ b/config/kraft/broker.properties
@@ -24,7 +24,7 @@
 process.roles=broker
 
 # The node id associated with this instance's roles
-node.id=1
+node.id=2
 
 # The connect string for the controller quorum
 controller.quorum.voters=1@localhost:9093
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 703d796..a30d31d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -390,6 +390,7 @@ object KafkaConfig {
   val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
   val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
   val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
+  val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -1919,6 +1920,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
         throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which
is required " +
           s"when `process.roles` is defined (i.e. when running in KRaft mode).")
       }
+
+      // Validate process.roles with controller.quorum.voters
+      val voterIds: Set[Integer] = RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet
+      if (voterIds.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp}
must contain a parseable set of voters.")
+      } else if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters

+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains
the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
+      } else {
+        // Ensure that the broker's node.id is not an id in controller.quorum.voters
+        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not
contain the 'controller' role, the node id $nodeId must not be included in the set of voters
${KafkaConfig.QuorumVotersProp}=$voterIds")
+      }
+
+      require(getClass(KafkaConfig.AlterConfigPolicyClassNameProp) == null, s"${KafkaConfig.AlterConfigPolicyClassNameProp}
is not supported in KRaft.")
+      require(getClass(KafkaConfig.CreateTopicPolicyClassNameProp) == null, s"${KafkaConfig.CreateTopicPolicyClassNameProp}
is not supported in KRaft.")
     }
     require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or
equal to 0")
@@ -2008,10 +2024,5 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     require(principalBuilderClass != null, s"${KafkaConfig.PrincipalBuilderClassProp} must
be non-null")
     require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), 
       s"${KafkaConfig.PrincipalBuilderClassProp} must implement KafkaPrincipalSerde")
-
-    if (usesSelfManagedQuorum) {
-      require(getClass(KafkaConfig.AlterConfigPolicyClassNameProp) == null, s"${KafkaConfig.AlterConfigPolicyClassNameProp}
is not supported in KRaft.")
-      require(getClass(KafkaConfig.CreateTopicPolicyClassNameProp) == null, s"${KafkaConfig.CreateTopicPolicyClassNameProp}
is not supported in KRaft.")
-    }
   }
 }
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 415b694..e02529b 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -17,7 +17,7 @@
 package kafka.raft
 
 import kafka.log.{Defaults, Log, SegmentDeletion}
-import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp,
MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp}
+import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp,
MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp}
 import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, RecordTooLargeException}
@@ -59,7 +59,8 @@ final class KafkaMetadataLogTest {
   def testConfig(): Unit = {
     val props = new Properties()
     props.put(ProcessRolesProp, util.Arrays.asList("broker"))
-    props.put(NodeIdProp, Int.box(1))
+    props.put(QuorumVotersProp, "1@localhost:9092")
+    props.put(NodeIdProp, Int.box(2))
     props.put(MetadataLogSegmentBytesProp, Int.box(10240))
     props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
     assertThrows(classOf[InvalidConfigurationException], () => {
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index d7e130e..bf056e8 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -19,14 +19,16 @@ package kafka
 import java.io.File
 import java.nio.file.Files
 import java.util
+import java.util.Properties
 
 import kafka.server.KafkaConfig
 import kafka.utils.Exit
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.internals.FatalExitError
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 
 import scala.jdk.CollectionConverters._
 
@@ -80,6 +82,79 @@ class KafkaTest {
   }
 
   @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their
node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their
node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use
their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that if process.roles is set
controller.quorum.voters is not empty
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that if neither process.roles nor controller.quorum.voters is populated, then
an exception is thrown if zookeeper.connect is not defined
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that no exception is thrown once zookeeper.connect is defined
+    propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  private def setListenerProps(props: Properties): Unit = {
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+    if (props.getProperty(KafkaConfig.ProcessRolesProp).contains("broker")) {
+      props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
+      props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")

+    }
+  }
+
+  @Test
   def testKafkaSslPasswords(): Unit = {
     val propertiesFile = prepareDefaultConfig()
     val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override",
"ssl.keystore.password=keystore_password",
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 40256ae..827d42e 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig
= {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles:
String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File):
KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()
+    val config = configWithProcessRolesAndNodeId(processRoles, nodeId, logDir)
     val topicId = new Uuid(0L, 2L)
     val metaProperties = MetaProperties(
       clusterId = Uuid.randomUuid.toString,
@@ -59,7 +73,7 @@ class RaftManagerTest {
       metaProperties,
       config,
       new ByteArraySerde,
-      new TopicPartition("__taft_id_test", 0),
+      topicPartition,
       topicId,
       Time.SYSTEM,
       new Metrics(Time.SYSTEM),
@@ -70,21 +84,21 @@ class RaftManagerTest {
 
   @Test
   def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
-    val raftManager = instantiateRaftManagerWithConfigs("broker", "1")
+    val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test",
0), "broker", "1")
     assertFalse(raftManager.client.nodeId.isPresent)
     raftManager.shutdown()
   }
 
   @Test
   def testNodeIdPresentIfControllerRoleOnly(): Unit = {
-    val raftManager = instantiateRaftManagerWithConfigs("controller", "1")
+    val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test",
0), "controller", "1")
     assertTrue(raftManager.client.nodeId.getAsInt == 1)
     raftManager.shutdown()
   }
 
   @Test
   def testNodeIdPresentIfColocated(): Unit = {
-    val raftManager = instantiateRaftManagerWithConfigs("controller,broker", "1")
+    val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test",
0), "controller,broker", "1")
     assertTrue(raftManager.client.nodeId.getAsInt == 1)
     raftManager.shutdown()
   }
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 288077c..a551288 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -48,6 +48,7 @@ class BrokerLifecycleManagerTest {
     properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
     properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
     properties.setProperty(KafkaConfig.NodeIdProp, "1")
+    properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093")
     properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000")
     properties
   }
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 7ab8a0b..768c06a 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -96,6 +96,7 @@ class ControllerApisTest {
     props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+    props.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
     new ControllerApis(
       requestChannel,
       authorizer,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6b91cfb7..c5ba162 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -132,6 +132,8 @@ class KafkaApisTest {
       val properties = TestUtils.createBrokerConfig(brokerId, "")
       properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
       properties.put(KafkaConfig.ProcessRolesProp, "broker")
+      val voterId = (brokerId + 1)
+      properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")
       properties
     } else {
       TestUtils.createBrokerConfig(brokerId, "zk")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b1d54d3..2e38df0 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -262,6 +262,8 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
     props.put(KafkaConfig.NodeIdProp, "1")
+    props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+
     assertFalse(isValidKafkaConfig(props))
     val caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
     assertTrue(caught.getMessage.contains("controller.listener.names cannot be empty if the
server has the controller role"))
@@ -1053,7 +1055,7 @@ class KafkaConfigTest {
 
   private def assertInvalidQuorumVoters(value: String): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+    props.put(KafkaConfig.QuorumVotersProp, value)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
@@ -1078,7 +1080,7 @@ class KafkaConfigTest {
 
   private def assertValidQuorumVoters(value: String, expectedVoters: util.Map[Integer, AddressSpec]):
Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+    props.put(KafkaConfig.QuorumVotersProp, value)
     val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
     assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
@@ -1091,6 +1093,7 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
     assertTrue(isValidKafkaConfig(props))
   }
@@ -1117,6 +1120,7 @@ class KafkaConfigTest {
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1193,6 +1197,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
     props.put(KafkaConfig.NodeIdProp, "1")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     assertTrue(isValidKafkaConfig(props))
   }
 
@@ -1206,6 +1211,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
     props.put(KafkaConfig.LogDirProp, dataDir)
     props.put(KafkaConfig.NodeIdProp, "1")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     assertTrue(isValidKafkaConfig(props))
 
     val config = KafkaConfig.fromProps(props)
@@ -1222,11 +1228,11 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
     props.put(KafkaConfig.NodeIdProp, "1")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     assertTrue(isValidKafkaConfig(props))
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(dataDir1, config.metadataLogDir)
     assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 6990a34..4b4a86b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -40,6 +40,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
     configProperties.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9092")
     configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
 
     val (loadedMetaProperties, offlineDirs) =
@@ -60,6 +61,7 @@ class KafkaRaftServerTest {
 
     configProperties.put(KafkaConfig.ProcessRolesProp, "controller")
     configProperties.put(KafkaConfig.NodeIdProp, configNodeId.toString)
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"$configNodeId@localhost:9092")
     configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
 
     assertThrows(classOf[InconsistentNodeIdException], () =>
@@ -105,6 +107,7 @@ class KafkaRaftServerTest {
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
     val config = KafkaConfig.fromProps(configProperties)
 
@@ -124,6 +127,7 @@ class KafkaRaftServerTest {
     val invalidDir = TestUtils.tempFile("blah")
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
     configProperties.put(KafkaConfig.MetadataLogDirProp, invalidDir.getAbsolutePath)
     configProperties.put(KafkaConfig.LogDirProp, validDir.getAbsolutePath)
@@ -146,6 +150,7 @@ class KafkaRaftServerTest {
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.MetadataLogDirProp, validDir.getAbsolutePath)
     configProperties.put(KafkaConfig.LogDirProp, invalidDir.getAbsolutePath)
     val config = KafkaConfig.fromProps(configProperties)
@@ -174,6 +179,7 @@ class KafkaRaftServerTest {
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath)
     configProperties.put(KafkaConfig.LogDirProp, dataDir.getAbsolutePath)
     val config = KafkaConfig.fromProps(configProperties)
@@ -194,6 +200,7 @@ class KafkaRaftServerTest {
 
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
     configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
     val config = KafkaConfig.fromProps(configProperties)
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index e79a46b..79f8cd9 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -35,6 +35,7 @@ class ServerTest {
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.NodeIdProp, nodeId.toString)
+    props.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     val config = KafkaConfig.fromProps(props)
 
     val context = Server.createKafkaMetricsContext(config, clusterId)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index f46f082..0242c33 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -37,6 +37,7 @@ class StorageToolTest {
     properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
     properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
     properties.setProperty(KafkaConfig.NodeIdProp, "2")
+    properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9092")
     properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
     properties
   }

Mime
View raw message