kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2896; Added system test for partition re-assignment
Date Fri, 11 Dec 2015 01:07:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a29f80dc7 -> a5f1537f8


KAFKA-2896; Added system test for partition re-assignment

Partition re-assignment tests with and without broker failure.

Author: Anna Povzner <anna@confluent.io>

Reviewers: Ben Stopford <ben@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>,
Geoff Anderson <geoff@confluent.io>

Closes #655 from apovzner/kafka_2896


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5f1537f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5f1537f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5f1537f

Branch: refs/heads/trunk
Commit: a5f1537f8762f4787b2af106c5d93c156f906074
Parents: a29f80d
Author: Anna Povzner <anna@confluent.io>
Authored: Thu Dec 10 17:06:47 2015 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Dec 10 17:06:47 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py         |  29 +++++
 .../kafkatest/tests/reassign_partitions_test.py | 109 +++++++++++++++++++
 2 files changed, 138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1537f/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 62ccc92..5fee147 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -263,6 +263,35 @@ class KafkaService(JmxMixin, Service):
             output += line
         return output
 
+    def parse_describe_topic(self, topic_description):
+        """Parse output of kafka-topics.sh --describe (or describe_topic() method above),
which is a string of form
+        PartitionCount:2\tReplicationFactor:2\tConfigs:
+            Topic: test_topic\ttPartition: 0\tLeader: 3\tReplicas: 3,1\tIsr: 3,1
+            Topic: test_topic\tPartition: 1\tLeader: 1\tReplicas: 1,2\tIsr: 1,2
+        into a dictionary structure appropriate for use with reassign-partitions tool:
+        {
+            "partitions": [
+                {"topic": "test_topic", "partition": 0, "replicas": [3, 1]},
+                {"topic": "test_topic", "partition": 1, "replicas": [1, 2]}
+            ]
+        }
+        """
+        lines = map(lambda x: x.strip(), topic_description.split("\n"))
+        partitions = []
+        for line in lines:
+            m = re.match(".*Leader:.*", line)
+            if m is None:
+                continue
+
+            fields = line.split("\t")
+            # ["Partition: 4", "Leader: 0"] -> ["4", "0"]
+            fields = map(lambda x: x.split(" ")[1], fields)
+            partitions.append(
+                {"topic": fields[0],
+                 "partition": int(fields[1]),
+                 "replicas": map(int, fields[3].split(','))})
+        return {"partitions": partitions}
+
     def verify_reassign_partitions(self, reassignment, node=None):
         """Run the reassign partitions admin tool in "verify" mode
         """

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5f1537f/tests/kafkatest/tests/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/reassign_partitions_test.py b/tests/kafkatest/tests/reassign_partitions_test.py
new file mode 100644
index 0000000..0ac12f5
--- /dev/null
+++ b/tests/kafkatest/tests/reassign_partitions_test.py
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+import random
+
+class ReassignPartitionsTest(ProduceConsumeValidateTest):
+    """
+    These tests validate partition reassignment.
+    Create a topic with few partitions, load some data, trigger partition re-assignment with
and without broker failure,
+    check that partition re-assignment can complete and there is no data loss.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ReassignPartitionsTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic:
{
+                                                                    "partitions": 20,
+                                                                    "replication-factor":
3,
+                                                                    'configs': {"min.insync.replicas":
2}}
+                                                                })
+        self.num_partitions = 20
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers
+ self.num_consumers
+
+    def clean_bounce_some_brokers(self):
+        """Bounce every other broker"""
+        for node in self.kafka.nodes[::2]:
+            self.kafka.restart_node(node, clean_shutdown=True)
+
+    def reassign_partitions(self, bounce_brokers):
+        partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
+        self.logger.debug("Partitions before reassignment:" + str(partition_info))
+
+        # jumble partition assignment in dictionary
+        seed = random.randint(0, 2 ** 31 - 1)
+        self.logger.debug("Jumble partition assignment with seed " + str(seed))
+        random.seed(seed)
+        # The list may still be in order, but that's ok
+        shuffled_list = range(0, self.num_partitions)
+        random.shuffle(shuffled_list)
+
+        for i in range(0, self.num_partitions):
+            partition_info["partitions"][i]["partition"] = shuffled_list[i]
+        self.logger.debug("Jumbled partitions: " + str(partition_info))
+
+        # send reassign partitions command
+        self.kafka.execute_reassign_partitions(partition_info)
+
+        if bounce_brokers:
+            # bounce a few brokers at the same time
+            self.clean_bounce_some_brokers()
+
+        # Wait until finished or timeout
+        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec,
backoff_sec=.5)
+
+    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
+    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
+    def test_reassign_partitions(self, bounce_brokers, security_protocol):
+        """Reassign partitions tests.
+        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and
min.insync.replicas=2
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Reassign partitions
+            - If bounce_brokers is True, also bounce a few brokers while partition re-assignment
is in progress
+            - When done reassigning partitions and bouncing brokers, stop producing, and
finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
+        self.kafka.start()
+        
+        self.run_produce_consume_validate(core_test_action=self.reassign_partitions(bounce_brokers))


Mime
View raw message