kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: MINOR: Support Raft-based metadata quorums in system tests (#10093)
Date Thu, 11 Feb 2021 17:45:39 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new faaef2c  MINOR: Support Raft-based metadata quorums in system tests (#10093)
faaef2c is described below

commit faaef2c2dff3208fe3ed23f4b075260475543957
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Thu Feb 11 12:44:17 2021 -0500

    MINOR: Support Raft-based metadata quorums in system tests (#10093)
    
    We need to be able to run system tests with Raft-based metadata quorums -- both
    co-located brokers and controllers as well as remote controllers -- in addition to the
    ZooKepeer-based mode we run today. This PR adds this capability to KafkaService in a
    backwards-compatible manner as follows.
    
    If no changes are made to existing system tests then they function as they always do --
    they instantiate ZooKeeper, and Kafka will use ZooKeeper. On the other hand, if we want
    to use a Raft-based metadata quorum we can do so by introducing a metadata_quorum
    argument to the test method and using @matrix to set it to the quorums we want to use for
    the various runs of the test. We then also have to skip creating a ZooKeeperService when
    the quorum is Raft-based.
    
    This PR does not update any tests -- those will come later after all the KIP-500 code is
    merged.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
 config/raft-broker.properties                      | 125 ++++++
 config/raft-combined.properties                    | 125 ++++++
 config/raft-controller.properties                  | 124 ++++++
 tests/kafkatest/services/kafka/config.py           |   3 -
 tests/kafkatest/services/kafka/config_property.py  |   5 +
 tests/kafkatest/services/kafka/kafka.py            | 436 ++++++++++++++++++---
 tests/kafkatest/services/kafka/quorum.py           | 144 +++++++
 .../services/kafka/templates/kafka.properties      |  32 +-
 8 files changed, 930 insertions(+), 64 deletions(-)

diff --git a/config/raft-broker.properties b/config/raft-broker.properties
new file mode 100644
index 0000000..b93ad1c
--- /dev/null
+++ b/config/raft-broker.properties
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in kip-500 mode
+process.roles=broker
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://localhost:9092
+inter.broker.listener.name=PLAINTEXT
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# Listener, host name, and port for the controller to advertise to the brokers. If
+# this server is a controller, this listener must be configured.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/raft-broker-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
diff --git a/config/raft-combined.properties b/config/raft-combined.properties
new file mode 100644
index 0000000..1d71b2f
--- /dev/null
+++ b/config/raft-combined.properties
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in kip-500 mode
+process.roles=broker,controller
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://:9092,CONTROLLER://:9093
+inter.broker.listener.name=PLAINTEXT
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# Listener, host name, and port for the controller to advertise to the brokers. If
+# this server is a controller, this listener must be configured.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/raft-combined-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
diff --git a/config/raft-controller.properties b/config/raft-controller.properties
new file mode 100644
index 0000000..a8fbf92
--- /dev/null
+++ b/config/raft-controller.properties
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in kip-500 mode
+process.roles=controller
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://:9093
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+
+# Listener, host name, and port for the controller to advertise to the brokers. If
+# this server is a controller, this listener must be configured.
+controller.listener.names=PLAINTEXT
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/raft-controller-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py
index 64e96e3..d440fcf 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -22,11 +22,8 @@ class KafkaConfig(dict):
     """
 
     DEFAULTS = {
-        config_property.PORT: 9092,
         config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
         config_property.LOG_DIRS: "/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
-        config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 18000,
-        config_property.ZOOKEEPER_SESSION_TIMEOUT_MS: 18000
     }
 
     def __init__(self, **kwargs):
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 6749c3b..2222c16 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -18,6 +18,11 @@ Define Kafka configuration property names here.
 """
 
 BROKER_ID = "broker.id"
+NODE_ID = "node.id"
+FIRST_BROKER_PORT = 9092
+FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
+FIRST_CONTROLLER_ID = 3001
+CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b"
 PORT = "port"
 ADVERTISED_HOSTNAME = "advertised.host.name"
 
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 4da2ab2..2073252 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -24,9 +24,8 @@ from ducktape.utils.util import wait_until
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
 from .config import KafkaConfig
-from kafkatest.version import KafkaVersion
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.kafka import config_property
+from kafkatest.services.kafka import config_property, quorum
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
@@ -53,6 +52,94 @@ class KafkaListener:
         return "%s:%s" % (self.name, self.security_protocol)
 
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
+    """
+    Ducktape system test service for Brokers and Raft-based Controllers
+
+    Metadata Quorums
+    ----------------
+    Kafka can use either ZooKeeper or a Raft Controller quorum for its
+    metadata.  See the kafkatest.services.kafka.quorum.ServiceQuorumInfo
+    class for details.
+
+    Attributes
+    ----------
+
+    quorum_info : kafkatest.services.kafka.quorum.ServiceQuorumInfo
+        Information about the service and it's metadata quorum
+    num_nodes_broker_role : int
+        The number of nodes in the service that include 'broker'
+        in process.roles (0 when using Zookeeper)
+    num_nodes_controller_role : int
+        The number of nodes in the service that include 'controller'
+        in process.roles (0 when using Zookeeper)
+    controller_quorum : KafkaService
+        None when using ZooKeeper, otherwise the Kafka service for the
+        co-located case or the remote controller quorum service
+        instance for the remote case
+    remote_controller_quorum : KafkaService
+        None for the co-located case or when using ZooKeeper, otherwise
+        the remote controller quorum service instance
+
+    Kafka Security Protocols
+    ------------------------
+    The security protocol advertised to clients and the inter-broker
+    security protocol can be set in the constructor and can be changed
+    afterwards as well.  Set these attributes to make changes; they
+    take effect when starting each node:
+
+    security_protocol : str
+        default PLAINTEXT
+    client_sasl_mechanism : str
+        default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+    interbroker_security_protocol : str
+        default PLAINTEXT
+    interbroker_sasl_mechanism : str
+        default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+
+    ZooKeeper
+    ---------
+    Create an instance of ZookeeperService when metadata_quorum is ZK
+    (ZK is the default if metadata_quorum is not a test parameter).
+
+    Raft Quorums
+    ------------
+    Set metadata_quorum accordingly (to COLOCATED_RAFT or REMOTE_RAFT).
+    Do not instantiate a ZookeeperService instance.
+
+    Starting Kafka will cause any remote controller quorum to
+    automatically start first.  Explicitly stopping Kafka does not stop
+    any remote controller quorum, but Ducktape will stop both when
+    tearing down the test (it will stop Kafka first).
+
+    Raft Security Protocols
+    --------------------------------
+    The broker-to-controller and inter-controller security protocols
+    will both initially be set to the inter-broker security protocol.
+    The broker-to-controller and inter-controller security protocols
+    must be identical for the co-located case (an exception will be
+    thrown when trying to start the service if they are not identical).
+    The broker-to-controller and inter-controller security protocols
+    can differ in the remote case.
+
+    Set these attributes for the co-located case.  Changes take effect
+    when starting each node:
+
+    controller_security_protocol : str
+        default PLAINTEXT
+    controller_sasl_mechanism : str
+        default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+    intercontroller_security_protocol : str
+        default PLAINTEXT
+    intercontroller_sasl_mechanism : str
+        default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+
+    Set the same attributes for the remote case (changes take effect
+    when starting each quorum node), but you must first obtain the
+    service instance for the remote quorum via one of the
+    'controller_quorum' or 'remote_controller_quorum' attributes as
+    defined above.
+
+    """
     PERSISTENT_ROOT = "/mnt/kafka"
     STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
@@ -74,6 +161,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
     ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf"
     KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
+    SECURITY_PROTOCOLS = [SecurityConfig.PLAINTEXT, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -103,14 +191,45 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  jmx_attributes=None, zk_connect_timeout=18000, zk_session_timeout=18000, server_prop_overides=None, zk_chroot=None,
                  zk_client_secure=False,
                  listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None,
-                 extra_kafka_opts="", tls_version=None):
+                 extra_kafka_opts="", tls_version=None,
+                 remote_kafka=None,
+                 controller_num_nodes_override=0,
+                 ):
         """
         :param context: test context
+        :param int num_nodes: the number of nodes in the service.  There are 4 possibilities:
+            1) Zookeeper quorum:
+                The number of brokers is defined by this parameter.
+            2) Co-located Raft quorum:
+                The number of nodes having a broker role is defined by this parameter.
+                The number of nodes having a controller role will by default be 1, 3, or 5 depending on num_nodes
+                (1 if num_nodes < 3, otherwise 3 if num_nodes < 5, otherwise 5).  This calculation
+                can be overridden via controller_num_nodes_override, which must be between 1 and num_nodes,
+                inclusive, when non-zero.  Here are some possibilities:
+                num_nodes = 1:
+                    node 0: broker.roles=broker+controller
+                num_nodes = 2:
+                    node 0: broker.roles=broker+controller
+                    node 1: broker.roles=broker
+                num_nodes = 3:
+                    node 0: broker.roles=broker+controller
+                    node 1: broker.roles=broker+controller
+                    node 2: broker.roles=broker+controller
+                num_nodes = 3, controller_num_nodes_override = 1
+                    node 0: broker.roles=broker+controller
+                    node 1: broker.roles=broker
+                    node 2: broker.roles=broker
+            3) Remote Raft quorum when instantiating the broker service:
+                The number of nodes, all of which will have broker.roles=broker, is defined by this parameter.
+            4) Remote Raft quorum when instantiating the controller service:
+                The number of nodes, all of which will have broker.roles=controller, is defined by this parameter.
+                The value passed in is determined by the broker service when that is instantiated, and it uses the
+                same algorithm as described above: 1, 3, or 5 unless controller_num_nodes_override is provided.
         :param ZookeeperService zk:
         :param dict topics: which topics to create automatically
         :param str security_protocol: security protocol for clients to use
         :param str tls_version: version of the TLS protocol.
-        :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
+        :param str interbroker_security_protocol: security protocol to use for broker-to-broker (and Raft controller-to-controller) communication
         :param str client_sasl_mechanism: sasl mechanism for clients to use
         :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
         :param str authorizer_class_name: which authorizer class to use
@@ -120,18 +239,75 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         :param int zk_connect_timeout:
         :param int zk_session_timeout:
         :param dict server_prop_overides: overrides for kafka.properties file
-        :param zk_chroot:
+        :param str zk_chroot:
         :param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True
         :param ListenerSecurityConfig listener_security_config: listener config to use
-        :param dict per_node_server_prop_overrides:
+        :param dict per_node_server_prop_overrides: overrides for kafka.properties file keyed by 0-based node number
         :param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
+        :param str tls_version: TLS version to use
+        :param KafkaService remote_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
+        :param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and remote_kafka is not None; ignored otherwise
+
         """
+
+        self.zk = zk
+        self.remote_kafka = remote_kafka
+        self.quorum_info = quorum.ServiceQuorumInfo(self, context)
+        self.controller_quorum = None # will define below if necessary
+        self.remote_controller_quorum = None # will define below if necessary
+
+        if num_nodes < 1:
+            raise Exception("Must set a positive number of nodes: %i" % num_nodes)
+        self.num_nodes_broker_role = 0
+        self.num_nodes_controller_role = 0
+
+        if self.quorum_info.using_raft:
+            if self.quorum_info.has_brokers:
+                num_nodes_broker_role = num_nodes
+                if self.quorum_info.has_controllers:
+                    self.num_nodes_controller_role = self.num_raft_controllers(num_nodes_broker_role, controller_num_nodes_override)
+                    if self.remote_kafka:
+                        raise Exception("Must not specify remote Kafka service with co-located Controller quorum")
+            else:
+                self.num_nodes_controller_role = num_nodes
+                if not self.remote_kafka:
+                    raise Exception("Must specify remote Kafka service when instantiating remote Controller service (should not happen)")
+
+            # Initially use the inter-broker security protocol for both
+            # broker-to-controller and inter-controller communication. Both can be explicitly changed later if desired.
+            # Note, however, that the two must the same if the controller quorum is co-located with the
+            # brokers.  Different security protocols for the two are only supported with a remote controller quorum.
+            self.controller_security_protocol = interbroker_security_protocol
+            self.controller_sasl_mechanism = interbroker_sasl_mechanism
+            self.intercontroller_security_protocol = interbroker_security_protocol
+            self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism
+
+            # Ducktape tears down services in the reverse order in which they are created,
+            # so create a service for the remote controller quorum (if we need one) first, before
+            # invoking Service.__init__(), so that Ducktape will tear down the quorum last; otherwise
+            # Ducktape will tear down the controller quorum first, which could lead to problems in
+            # Kafka and delays in tearing it down (and who knows what else -- it's simply better
+            # to correctly tear down Kafka first, before tearing down the remote controller).
+            if self.quorum_info.has_controllers:
+                self.controller_quorum = self
+            else:
+                num_remote_controller_nodes = self.num_raft_controllers(num_nodes, controller_num_nodes_override)
+                self.remote_controller_quorum = KafkaService(
+                    context, num_remote_controller_nodes, None, security_protocol=self.controller_security_protocol,
+                    interbroker_security_protocol=self.intercontroller_security_protocol,
+                    client_sasl_mechanism=self.controller_sasl_mechanism, interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism,
+                    authorizer_class_name=authorizer_class_name, version=version, jmx_object_names=jmx_object_names,
+                    jmx_attributes=jmx_attributes,
+                    listener_security_config=listener_security_config,
+                    extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
+                    remote_kafka=self,
+                )
+                self.controller_quorum = self.remote_controller_quorum
+
         Service.__init__(self, context, num_nodes)
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=KafkaService.PERSISTENT_ROOT)
 
-        self.zk = zk
-
         self.security_protocol = security_protocol
         self.tls_version = tls_version
         self.client_sasl_mechanism = client_sasl_mechanism
@@ -171,35 +347,79 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         # e.g. brokers to deregister after a hard kill.
         self.zk_session_timeout = zk_session_timeout
 
-        self.port_mappings = {
-            'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
-            'SSL': KafkaListener('SSL', 9093, 'SSL', False),
-            'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 'SASL_PLAINTEXT', False),
-            'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
+        broker_only_port_mappings = {
             KafkaService.INTERBROKER_LISTENER_NAME:
-                KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, None, False)
+                KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, config_property.FIRST_BROKER_PORT + 7, None, False)
         }
+        controller_only_port_mappings = {}
+        for idx, sec_protocol in enumerate(KafkaService.SECURITY_PROTOCOLS):
+            name_for_controller = self.controller_listener_name(sec_protocol)
+            broker_only_port_mappings[sec_protocol] = KafkaListener(sec_protocol, config_property.FIRST_BROKER_PORT + idx, sec_protocol, False)
+            controller_only_port_mappings[name_for_controller] = KafkaListener(name_for_controller, config_property.FIRST_CONTROLLER_PORT + idx, sec_protocol, False)
+
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers and not self.quorum_info.has_controllers: # ZK or Raft broker-only
+            self.port_mappings = broker_only_port_mappings
+        elif self.quorum_info.has_brokers_and_controllers: # Raft broker+controller
+            self.port_mappings = broker_only_port_mappings.copy()
+            self.port_mappings.update(controller_only_port_mappings)
+        else: # Raft controller-only
+            self.port_mappings = controller_only_port_mappings
 
         self.interbroker_listener = None
-        self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+            self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
         self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         self._security_config = None
 
         for node in self.nodes:
+            node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
+
             node.version = version
-            node.config = KafkaConfig(**{
+            raft_broker_configs = {
+                config_property.PORT: config_property.FIRST_BROKER_PORT,
+                config_property.NODE_ID: self.idx(node),
+            }
+            zk_broker_configs = {
+                config_property.PORT: config_property.FIRST_BROKER_PORT,
                 config_property.BROKER_ID: self.idx(node),
                 config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: zk_connect_timeout,
                 config_property.ZOOKEEPER_SESSION_TIMEOUT_MS: zk_session_timeout
-            })
+            }
+            controller_only_configs = {
+                config_property.NODE_ID: self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1,
+            }
+            if node_quorum_info.service_quorum_info.using_zk:
+                node.config = KafkaConfig(**zk_broker_configs)
+            elif not node_quorum_info.has_broker_role: # Raft controller-only role
+                node.config = KafkaConfig(**controller_only_configs)
+            else: # Raft broker-only role or combined broker+controller roles
+                node.config = KafkaConfig(**raft_broker_configs)
+
+    def num_raft_controllers(self, num_nodes_broker_role, controller_num_nodes_override):
+        if controller_num_nodes_override < 0:
+            raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override)
+        if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.colocated_raft:
+            raise Exception("controller_num_nodes_override must not exceed the service's node count in the co-located case: %i > %i" %
+                            (controller_num_nodes_override, num_nodes_broker_role))
+        if controller_num_nodes_override:
+            return controller_num_nodes_override
+        if num_nodes_broker_role < 3:
+            return 1
+        if num_nodes_broker_role < 5:
+            return 3
+        return 5
 
     def set_version(self, version):
         for node in self.nodes:
             node.version = version
 
+    def controller_listener_name(self, security_protocol_name):
+        return "CONTROLLER_%s" % security_protocol_name
+
     @property
     def interbroker_security_protocol(self):
-        return self.interbroker_listener.security_protocol
+        # TODO: disentangle interbroker and intercontroller protocol information
+        return self.interbroker_listener.security_protocol if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_security_protocol
 
     # this is required for backwards compatibility - there are a lot of tests that set this property explicitly
     # meaning 'use one of the existing listeners that match given security protocol, do not use custom listener'
@@ -222,21 +442,24 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     @property
     def security_config(self):
         if not self._security_config:
-            self._security_config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
-                                    zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure,
-                                    client_sasl_mechanism=self.client_sasl_mechanism,
-                                    interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
-                                    listener_security_config=self.listener_security_config,
-                                    tls_version=self.tls_version)
+            client_sasl_mechanism_to_use = self.client_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.controller_sasl_mechanism
+            interbroker_sasl_mechanism_to_use = self.interbroker_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_sasl_mechanism
+            self._security_config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
+                                                   zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
+                                                   client_sasl_mechanism=client_sasl_mechanism_to_use,
+                                                   interbroker_sasl_mechanism=interbroker_sasl_mechanism_to_use,
+                                                   listener_security_config=self.listener_security_config,
+                                                   tls_version=self.tls_version)
         for port in self.port_mappings.values():
             if port.open:
                 self._security_config.enable_security_protocol(port.security_protocol)
-        if self.zk.zk_sasl:
-            self._security_config.enable_sasl()
-            self._security_config.zk_sasl = self.zk.zk_sasl
-        if self.zk_client_secure:
-            self._security_config.enable_ssl()
-            self._security_config.zk_tls = self.zk_client_secure
+        if self.quorum_info.using_zk:
+            if self.zk.zk_sasl:
+                self._security_config.enable_sasl()
+                self._security_config.zk_sasl = self.zk.zk_sasl
+            if self.zk_client_secure:
+                self._security_config.enable_ssl()
+                self._security_config.zk_tls = self.zk_client_secure
         return self._security_config
 
     def open_port(self, listener_name):
@@ -246,35 +469,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.port_mappings[listener_name].open = False
 
     def start_minikdc_if_necessary(self, add_principals=""):
-        if self.security_config.has_sasl:
+        has_sasl = self.security_config.has_sasl if self.quorum_info.using_zk else \
+            self.security_config.has_sasl or self.controller_quorum.security_config.has_sasl if self.quorum_info.has_brokers else \
+                self.security_config.has_sasl or self.remote_kafka.security_config.has_sasl
+        if has_sasl:
             if self.minikdc is None:
-                self.minikdc = MiniKdc(self.context, self.nodes, extra_principals = add_principals)
-                self.minikdc.start()
+                other_service = self.remote_kafka if self.remote_kafka else self.controller_quorum if self.quorum_info.using_raft else None
+                if not other_service or not other_service.minikdc:
+                    nodes_for_kdc = self.nodes.copy()
+                    if other_service and other_service != self:
+                        nodes_for_kdc += other_service.nodes
+                    self.minikdc = MiniKdc(self.context, nodes_for_kdc, extra_principals = add_principals)
+                    self.minikdc.start()
         else:
             self.minikdc = None
+            if self.quorum_info.using_raft:
+                self.controller_quorum.minikdc = None
+                if self.remote_kafka:
+                    self.remote_kafka.minikdc = None
 
     def alive(self, node):
         return len(self.pids(node)) > 0
 
     def start(self, add_principals=""):
-        if self.zk_client_secure and not self.zk.zk_client_secure_port:
+        if self.quorum_info.using_zk and self.zk_client_secure and not self.zk.zk_client_secure_port:
             raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
-        self.open_port(self.security_protocol)
-        self.interbroker_listener.open = True
+        if self.quorum_info.has_brokers_and_controllers and (
+                self.controller_security_protocol != self.intercontroller_security_protocol or
+                self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
+            raise Exception("Co-located Raft-based Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" %
+                            (self.controller_security_protocol, self.controller_sasl_mechanism,
+                             self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism))
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+            self.open_port(self.security_protocol)
+            self.interbroker_listener.open = True
+        # we have to wait to decide whether to open the controller port(s)
+        # because it could be dependent on the particular node in the
+        # co-located case where the number of controllers could be less
+        # than the number of nodes in the service
 
         self.start_minikdc_if_necessary(add_principals)
-        self._ensure_zk_chroot()
+        if self.quorum_info.using_zk:
+            self._ensure_zk_chroot()
 
+        if self.remote_controller_quorum:
+            self.remote_controller_quorum.start()
         Service.start(self)
 
-        self.logger.info("Waiting for brokers to register at ZK")
+        if self.quorum_info.using_zk:
+            self.logger.info("Waiting for brokers to register at ZK")
 
-        retries = 30
-        expected_broker_ids = set(self.nodes)
-        wait_until(lambda: {node for node in self.nodes if self.is_registered(node)} == expected_broker_ids, 30, 1)
+            retries = 30
+            expected_broker_ids = set(self.nodes)
+            wait_until(lambda: {node for node in self.nodes if self.is_registered(node)} == expected_broker_ids, 30, 1)
 
-        if retries == 0:
-            raise RuntimeError("Kafka servers didn't register at ZK within 30 seconds")
+            if retries == 0:
+                raise RuntimeError("Kafka servers didn't register at ZK within 30 seconds")
 
         # Create topics if necessary
         if self.topics is not None:
@@ -300,16 +550,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         advertised_listeners = []
         protocol_map = []
 
+        controller_listener_names = self.controller_listener_name_list()
+
         for port in self.port_mappings.values():
             if port.open:
                 listeners.append(port.listener())
-                advertised_listeners.append(port.advertised_listener(node))
+                if not port.name in controller_listener_names:
+                    advertised_listeners.append(port.advertised_listener(node))
                 protocol_map.append(port.listener_security_protocol())
+        controller_sec_protocol = self.remote_controller_quorum.controller_security_protocol if self.remote_controller_quorum \
+            else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
+            else None
+        if controller_sec_protocol:
+            protocol_map.append("%s:%s" % (self.controller_listener_name(controller_sec_protocol), controller_sec_protocol))
 
         self.listeners = ','.join(listeners)
         self.advertised_listeners = ','.join(advertised_listeners)
         self.listener_security_protocol_map = ','.join(protocol_map)
-        self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+            self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
 
     def prop_file(self, node):
         self.set_protocol_and_port(node)
@@ -324,13 +583,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         #load specific test override configs
         override_configs = KafkaConfig(**node.config)
-        override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
-        override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
-        if self.zk_client_secure:
-            override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'true'
-            override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] = 'org.apache.zookeeper.ClientCnxnSocketNetty'
-        else:
-            override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+            override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
+        if self.quorum_info.using_zk:
+            override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
+            if self.zk_client_secure:
+                override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'true'
+                override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] = 'org.apache.zookeeper.ClientCnxnSocketNetty'
+            else:
+                override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
 
         for prop in self.server_prop_overides:
             override_configs[prop[0]] = prop[1]
@@ -370,11 +631,40 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                 KafkaService.STDOUT_STDERR_CAPTURE)
         return cmd
 
+    def controller_listener_name_list(self):
+        if self.quorum_info.using_zk:
+            return []
+        broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
+        return [broker_to_controller_listener_name] if (self.controller_quorum.intercontroller_security_protocol == self.controller_quorum.controller_security_protocol) \
+            else [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
+
     def start_node(self, node, timeout_sec=60):
         node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
 
+        self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
+        if self.quorum_info.has_controllers:
+            for controller_listener in self.controller_listener_name_list():
+                if self.node_quorum_info.has_controller_role:
+                    self.open_port(controller_listener)
+                else: # co-located case where node doesn't have a controller
+                    self.close_port(controller_listener)
+
         self.security_config.setup_node(node)
-        self.maybe_setup_broker_scram_credentials(node)
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO: SCRAM currently unsupported for controller quorum
+            self.maybe_setup_broker_scram_credentials(node)
+
+        if self.quorum_info.using_raft:
+            # define controller.quorum.voters text
+            security_protocol_to_use = self.controller_quorum.controller_security_protocol
+            first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
+            self.controller_quorum_voters = ','.join(["%s@%s:%s" %
+                                                      (self.controller_quorum.idx(node) + first_node_id - 1,
+                                                       node.account.hostname,
+                                                       config_property.FIRST_CONTROLLER_PORT +
+                                                       KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+                                                      for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
+            # define controller.listener.names
+            self.controller_listener_names = ','.join(self.controller_listener_name_list())
 
         prop_file = self.prop_file(node)
         self.logger.info("kafka.properties:")
@@ -382,6 +672,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
         node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
 
+        if self.quorum_info.using_raft:
+            # format log directories if necessary
+            kafka_storage_script = self.path.script("kafka-storage.sh", node)
+            cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
+            self.logger.info("Running log directory format command...\n%s" % cmd)
+            node.account.ssh(cmd)
+
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
         with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
@@ -390,12 +687,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             monitor.wait_until("Kafka\s*Server.*started", timeout_sec=timeout_sec, backoff_sec=.25,
                                err_msg="Kafka server didn't finish startup in %d seconds" % timeout_sec)
 
-        # Credentials for inter-broker communication are created before starting Kafka.
-        # Client credentials are created after starting Kafka so that both loading of
-        # existing credentials from ZK and dynamic update of credentials in Kafka are tested.
-        # We use the admin client and connect as the broker user when creating the client (non-broker) credentials
-        # if Kafka supports KIP-554, otherwise we use ZooKeeper.
-        self.maybe_setup_client_scram_credentials(node)
+        if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO: SCRAM currently unsupported for controller quorum
+            # Credentials for inter-broker communication are created before starting Kafka.
+            # Client credentials are created after starting Kafka so that both loading of
+            # existing credentials from ZK and dynamic update of credentials in Kafka are tested.
+            # We use the admin client and connect as the broker user when creating the client (non-broker) credentials
+            # if Kafka supports KIP-554, otherwise we use ZooKeeper.
+            self.maybe_setup_client_scram_credentials(node)
 
         self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
@@ -448,6 +746,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
 
     def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
+        if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
+            raise Exception("Must invoke kafka-topics against a broker, not a Raft controller")
         if force_use_zk_connection:
             bootstrap_server_or_zookeeper = "--zookeeper %s" % (self.zk_connect_setting())
             skip_optional_security_settings = True
@@ -485,6 +785,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                 bootstrap_server_or_zookeeper, optional_command_config_suffix)
 
     def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
+        if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
+            raise Exception("Must invoke kafka-configs against a broker, not a Raft controller")
         if force_use_zk_connection:
             # kafka-configs supports a TLS config file, so include it if there is one
             bootstrap_server_or_zookeeper = "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
@@ -719,6 +1021,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         node.account.ssh(cmd)
 
     def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None):
+        if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
+            raise Exception("Must invoke kafka-acls against a broker, not a Raft controller")
         force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server
         if force_use_zk_connection:
             bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting())
@@ -913,6 +1217,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         return missing
 
     def restart_cluster(self, clean_shutdown=True, timeout_sec=60, after_each_broker_restart=None, *args):
+        # We do not restart the remote controller quorum if it exists.
+        # This is not widely used -- it typically appears in rolling upgrade tests --
+        # so we will let tests explicitly decide if/when to restart any remote controller quorum.
         for node in self.nodes:
             self.restart_node(node, clean_shutdown=clean_shutdown, timeout_sec=timeout_sec)
             if after_each_broker_restart is not None:
@@ -1021,6 +1328,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def cluster_id(self):
         """ Get the current cluster id
         """
+        if self.quorum_info.using_raft:
+            return config_property.CLUSTER_ID
+
         self.logger.debug("Querying ZooKeeper to retrieve cluster id")
         cluster = self.zk.query("/cluster/id", chroot=self.zk_chroot)
 
@@ -1031,6 +1341,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             raise
 
     def topic_id(self, topic):
+        if self.quorum_info.using_raft:
+            raise Exception("Not yet implemented: Cannot obtain topic ID information when using Raft instead of ZooKeeper")
         self.logger.debug(
             "Querying zookeeper to find assigned topic ID for topic %s." % topic)
         zk_path = "/brokers/topics/%s" % topic
@@ -1107,6 +1419,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         return output
 
     def zk_connect_setting(self):
+        if self.quorum_info.using_raft:
+            raise Exception("No zookeeper connect string available when using Raft instead of ZooKeeper")
         return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
 
     def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
@@ -1130,6 +1444,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def controller(self):
         """ Get the controller node
         """
+        if self.quorum_info.using_raft:
+            raise Exception("Cannot obtain Controller node when using Raft instead of ZooKeeper")
         self.logger.debug("Querying zookeeper to find controller broker")
         controller_info = self.zk.query("/controller", chroot=self.zk_chroot)
 
@@ -1147,6 +1463,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """
         Check whether a broker is registered in Zookeeper
         """
+        if self.quorum_info.using_raft:
+            raise Exception("Cannot obtain broker registration information when using Raft instead of ZooKeeper")
         self.logger.debug("Querying zookeeper to see if broker %s is registered", str(node))
         broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node), chroot=self.zk_chroot)
         self.logger.debug("Broker info: %s", broker_info)
diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py
new file mode 100644
index 0000000..7153463
--- /dev/null
+++ b/tests/kafkatest/services/kafka/quorum.py
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# the types of metadata quorums we support
+zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s)
+colocated_raft = 'COLOCATED_RAFT' # co-located KIP-500 Controllers, used during/after the KIP-500 bridge release(s)
+remote_raft = 'REMOTE_RAFT' # separate KIP-500 Controllers, used during/after the KIP-500 bridge release(s)
+
+# How we will parameterize tests that exercise all quorum styles
+#   [“ZK”, “REMOTE_RAFT”, "COLOCATED_RAFT"] during the KIP-500 bridge release(s)
+#   [“REMOTE_RAFT”, "COLOCATED_RAFT”] after the KIP-500 bridge release(s)
+all = [zk, remote_raft, colocated_raft]
+# How we will parameterize tests that exercise all Raft quorum styles
+all_raft = [remote_raft, colocated_raft]
+# How we will parameterize tests that are unrelated to upgrades:
+#   [“ZK”] before the KIP-500 bridge release(s)
+#   [“ZK”, “REMOTE_RAFT”] during the KIP-500 bridge release(s) and in preview releases
+#   [“REMOTE_RAFT”] after the KIP-500 bridge release(s)
+all_non_upgrade = [zk, remote_raft]
+
+def for_test(test_context):
+    # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
+    default_quorum_type = zk
+    arg_name = 'metadata_quorum'
+    retval = default_quorum_type if not test_context.injected_args else test_context.injected_args.get(arg_name, default_quorum_type)
+    if retval not in all:
+        raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
+    return retval
+
+class ServiceQuorumInfo:
+    """
+    Exposes quorum-related information for a KafkaService
+
+    Kafka can use either ZooKeeper or a Raft Controller quorum for its
+    metadata.  Raft Controllers can either be co-located with Kafka in
+    the same JVM or remote in separate JVMs.  The choice is made via
+    the 'metadata_quorum' parameter defined for the system test: if it
+    is not explicitly defined, or if it is set to 'ZK', then ZooKeeper
+    is used.  If it is explicitly set to 'COLOCATED_RAFT' then Raft
+    controllers will be co-located with the brokers; the value
+    `REMOTE_RAFT` indicates remote controllers.
+
+    Attributes
+    ----------
+
+    kafka : KafkaService
+        The service for which this instance exposes quorum-related
+        information
+    quorum_type : str
+        COLOCATED_RAFT, REMOTE_RAFT, or ZK
+    using_zk : bool
+        True iff quorum_type==ZK
+    using_raft : bool
+        False iff quorum_type==ZK
+    has_brokers : bool
+        Whether there is at least one node with process.roles
+        containing 'broker'.  True iff using_raft and the Kafka
+        service doesn't itself have a remote Kafka service (meaning
+        it is not a remote controller quorum).
+    has_controllers : bool
+        Whether there is at least one node with process.roles
+        containing 'controller'.  True iff quorum_type ==
+        COLOCATED_RAFT or the Kafka service itself has a remote Kafka
+        service (meaning it is a remote controller quorum).
+    has_brokers_and_controllers :
+        True iff quorum_type==COLOCATED_RAFT
+    """
+
+    def __init__(self, kafka, context):
+        """
+
+        :param kafka : KafkaService
+            The service for which this instance exposes quorum-related
+            information
+        :param context : TestContext
+            The test context within which the this instance and the
+            given Kafka service is being instantiated
+        """
+
+        quorum_type = for_test(context)
+        if quorum_type != zk and kafka.zk:
+            raise Exception("Cannot use ZooKeeper while specifying a Raft metadata quorum (should not happen)")
+        if kafka.remote_kafka and quorum_type != remote_raft:
+            raise Exception("Cannot specify a remote Kafka service unless using a remote Raft metadata quorum (should not happen)")
+        self.kafka = kafka
+        self.quorum_type = quorum_type
+        self.using_zk = quorum_type == zk
+        self.using_raft = not self.using_zk
+        self.has_brokers = self.using_raft and not kafka.remote_kafka
+        self.has_controllers = quorum_type == colocated_raft or kafka.remote_kafka
+        self.has_brokers_and_controllers = quorum_type == colocated_raft
+
+class NodeQuorumInfo:
+    """
+    Exposes quorum-related information for a node in a KafkaService
+
+    Attributes
+    ----------
+    service_quorum_info : ServiceQuorumInfo
+        The quorum information about the service to which the node
+        belongs
+    has_broker_role : bool
+        True iff using_raft and the Kafka service doesn't itself have
+        a remote Kafka service (meaning it is not a remote controller)
+    has_controller_role : bool
+        True iff quorum_type==COLOCATED_RAFT and the node is one of
+        the first N in the cluster where N is the number of nodes
+        that have a controller role; or the Kafka service itself has a
+        remote Kafka service (meaning it is a remote controller
+        quorum).
+    has_combined_broker_and_controller_roles :
+        True iff has_broker_role==True and has_controller_role==true
+    """
+
+    def __init__(self, service_quorum_info, node):
+        """
+        :param service_quorum_info : ServiceQuorumInfo
+            The quorum information about the service to which the node
+            belongs
+        :param node : Node
+            The particular node for which this information applies.
+            In the co-located case, whether or not a node's broker's
+            process.roles contains 'controller' may vary based on the
+            particular node if the number of controller nodes is less
+            than the number of nodes in the service.
+        """
+
+        self.service_quorum_info = service_quorum_info
+        self.has_broker_role = self.service_quorum_info.has_brokers
+        idx = self.service_quorum_info.kafka.nodes.index(node)
+        self.has_controller_role = self.service_quorum_info.kafka.num_nodes_controller_role > idx
+        self.has_combined_broker_and_controller_roles = self.has_broker_role and self.has_controller_role
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 96acc2c..d7fa2d2 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -14,18 +14,36 @@
 # limitations under the License.
 
 # see kafka.server.KafkaConfig for additional details and defaults
-advertised.host.name={{ node.account.hostname }}
+{% if quorum_info.using_raft %}
+# The role(s) of this server. Setting this puts us in Raft metadata quorm mode
+{% if node_quorum_info.has_combined_broker_and_controller_roles %}
+process.roles=broker,controller
+{% elif node_quorum_info.has_controller_role %}
+process.roles=controller
+{% else %}
+process.roles=broker
+{% endif %}
+# The connect string for the controller quorum
+controller.quorum.voters={{ controller_quorum_voters }}
 
+controller.listener.names={{ controller_listener_names }}
+
+{% endif %}
 
 listeners={{ listeners }}
-advertised.listeners={{ advertised_listeners }}
+
 listener.security.protocol.map={{ listener_security_protocol_map }}
 
+{% if quorum_info.using_zk or quorum_info.has_brokers %}
+advertised.host.name={{ node.account.hostname }}
+advertised.listeners={{ advertised_listeners }}
+
 {% if node.version.supports_named_listeners() %}
 inter.broker.listener.name={{ interbroker_listener.name }}
 {% else %}
 security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
 {% endif %}
+{% endif %}
 
 {% for k, v in listener_security_config.client_listener_overrides.items() %}
 {% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
@@ -35,6 +53,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
 {% endif %}
 {% endfor %}
 
+{% if quorum_info.using_zk or quorum_info.has_brokers %}
 {% if interbroker_listener.name != security_protocol %}
 {% for k, v in listener_security_config.interbroker_listener_overrides.items() %}
 {% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
@@ -44,6 +63,8 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
 {% endif %}
 {% endfor %}
 {% endif %}
+{% endif %}
+
 {% if security_config.tls_version is not none %}
 ssl.enabled.protocols={{ security_config.tls_version }}
 ssl.protocol={{ security_config.tls_version }}
@@ -56,6 +77,8 @@ ssl.truststore.location=/mnt/security/test.truststore.jks
 ssl.truststore.password=test-ts-passwd
 ssl.truststore.type=JKS
 ssl.endpoint.identification.algorithm=HTTPS
+
+{% if quorum_info.using_zk %}
 # Zookeeper TLS settings
 #
 # Note that zookeeper.ssl.client.enable will be set to true or false elsewhere, as appropriate.
@@ -67,8 +90,11 @@ zookeeper.ssl.keystore.password=test-ks-passwd
 {% endif %}
 zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks
 zookeeper.ssl.truststore.password=test-ts-passwd
+{% endif %}
 #
+{% if quorum_info.using_zk or quorum_info.has_brokers %}
 sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
+{% endif %}
 sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
 sasl.kerberos.service.name=kafka
 {% if authorizer_class_name is not none %}
@@ -76,10 +102,12 @@ ssl.client.auth=required
 authorizer.class.name={{ authorizer_class_name }}
 {% endif %}
 
+{% if quorum_info.using_zk %}
 zookeeper.set.acl={{"true" if zk_set_acl else "false"}}
 
 zookeeper.connection.timeout.ms={{ zk_connect_timeout }}
 zookeeper.session.timeout.ms={{ zk_session_timeout }}
+{% endif %}
 
 {% if replica_lag is defined %}
 replica.lag.time.max.ms={{replica_lag}}


Mime
View raw message