kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3380; Add system test for GetOffsetShell tool
Date Fri, 11 Mar 2016 20:17:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 02d4da5f6 -> a162f6bf6


KAFKA-3380; Add system test for GetOffsetShell tool

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Gwen Shapira

Closes #1048 from SinghAsDev/KAFKA-3380


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a162f6bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a162f6bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a162f6bf

Branch: refs/heads/trunk
Commit: a162f6bf66d0d21505c8d11942f84be446616491
Parents: 02d4da5
Author: Ashish Singh <asingh@cloudera.com>
Authored: Fri Mar 11 12:17:45 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Mar 11 12:17:45 2016 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py        | 20 ++++-
 tests/kafkatest/tests/get_offset_shell_test.py | 91 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a162f6bf/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 358dacf..788d41b 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -490,4 +490,22 @@ class KafkaService(JmxMixin, Service):
 
         controller_idx = int(controller_info["brokerid"])
         self.logger.info("Controller's ID: %d" % (controller_idx))
-        return self.get_node(controller_idx)
\ No newline at end of file
+        return self.get_node(controller_idx)
+
+    def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
+        node = self.nodes[0]
+
+        cmd = "/opt/%s/bin/" % kafka_dir(node)
+        cmd += "kafka-run-class.sh kafka.tools.GetOffsetShell"
+        cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic,
self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time)
+
+        if partitions:
+            cmd += '  --partitions %s' % partitions
+
+        cmd += " 2>> /mnt/get_offset_shell.log | tee -a /mnt/get_offset_shell.log &"
+        output = ""
+        self.logger.debug(cmd)
+        for line in node.account.ssh_capture(cmd):
+            output += line
+        self.logger.debug(output)
+        return output
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a162f6bf/tests/kafkatest/tests/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/get_offset_shell_test.py b/tests/kafkatest/tests/get_offset_shell_test.py
new file mode 100644
index 0000000..38bd9dc
--- /dev/null
+++ b/tests/kafkatest/tests/get_offset_shell_test.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+TOPIC = "topic-get-offset-shell"
+MAX_MESSAGES = 100
+NUM_PARTITIONS = 1
+REPLICATION_FACTOR = 1
+
+class GetOffsetShellTest(Test):
+    """
+    Tests GetOffsetShell tool
+    """
+    def __init__(self, test_context):
+        super(GetOffsetShellTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.messages_received_count = 0
+        self.topics = {
+            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    def start_producer(self):
+        # This will produce to kafka cluster
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
+        self.producer.start()
+        current_acked = self.producer.num_acked
+        wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
+                   err_msg="Timeout awaiting messages to be produced and acked")
+
+    def start_consumer(self, security_protocol):
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka,
topic=TOPIC,
+                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
+        self.consumer.start()
+
+    def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if GetOffsetShell is getting offsets correctly
+        :return: None
+        """
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_producer()
+
+        # Assert that offset fetched without any consumers consuming is 0
+        assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC,
NUM_PARTITIONS - 1, 0)
+
+        self.start_consumer(security_protocol)
+
+        node = self.consumer.nodes[0]
+
+        wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer
was too slow to start")
+
+        # Assert that offset is correctly indicated by GetOffsetShell tool
+        wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC,
None, 1000, 1, -1), timeout_sec=10,
+                   err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file


Mime
View raw message