kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-4140: Upgrade to ducktape 0.6.0 and make system tests parallel friendly
Date Mon, 12 Dec 2016 02:43:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6f7ed15da -> 62e043a86


http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index b6bc656..4edbcff 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -13,7 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from ducktape.errors import TimeoutError
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -23,20 +27,19 @@ from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.security.security_config import SslStores
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
-import time
 
 class TestSslStores(SslStores):
-    def __init__(self):
-        super(TestSslStores, self).__init__()
-        self.invalid_hostname = False
+    def __init__(self, local_scratch_dir, valid_hostname=True):
+        super(TestSslStores, self).__init__(local_scratch_dir)
+        self.valid_hostname = valid_hostname
         self.generate_ca()
         self.generate_truststore()
 
     def hostname(self, node):
-        if (self.invalid_hostname):
-            return "invalidhost"
-        else:
+        if self.valid_hostname:
             return super(TestSslStores, self).hostname(node)
+        else:
+            return "invalidhostname"
 
 class SecurityTest(ProduceConsumeValidateTest):
     """
@@ -62,6 +65,18 @@ class SecurityTest(ProduceConsumeValidateTest):
     def setUp(self):
         self.zk.start()
 
+    def producer_consumer_have_expected_error(self, error):
+        try:
+            for node in self.producer.nodes:
+                node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
+            for node in self.consumer.nodes:
+                node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
+        except RemoteCommandError:
+            return False
+
+        return True
+
+    @cluster(num_nodes=7)
     @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
     @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
     def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
@@ -74,29 +89,35 @@ class SecurityTest(ProduceConsumeValidateTest):
 
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = interbroker_security_protocol
-        SecurityConfig.ssl_stores = TestSslStores()
+        SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False)
 
-        SecurityConfig.ssl_stores.invalid_hostname = True
         self.kafka.start()
         self.create_producer_and_consumer()
         self.producer.log_level = "TRACE"
+
         self.producer.start()
         self.consumer.start()
-        time.sleep(10)
-        assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation
did not fail with invalid hostname"
-        error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
-        for node in self.producer.nodes:
-            node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
-        for node in self.consumer.nodes:
-            node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
+        try:
+            wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5)
+
+            # Fail quickly if messages are successfully acked
+            raise RuntimeError("Messages published successfully but should not have!"
+                               " Endpoint validation did not fail with invalid hostname")
+        except TimeoutError:
+            # expected
+            pass
+
+        error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
+        wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5)
 
         self.producer.stop()
         self.consumer.stop()
         self.producer.log_level = "INFO"
 
-        SecurityConfig.ssl_stores.invalid_hostname = False
+        SecurityConfig.ssl_stores.valid_hostname = True
         for node in self.kafka.nodes:
             self.kafka.restart_node(node, clean_shutdown=True)
+
         self.create_producer_and_consumer()
         self.run_produce_consume_validate()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/simple_consumer_shell_test.py b/tests/kafkatest/tests/core/simple_consumer_shell_test.py
index 74a7eeb..882aae7 100644
--- a/tests/kafkatest/tests/core/simple_consumer_shell_test.py
+++ b/tests/kafkatest/tests/core/simple_consumer_shell_test.py
@@ -16,6 +16,8 @@
 
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+
 from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
 from kafkatest.services.verifiable_producer import VerifiableProducer
 
@@ -26,6 +28,7 @@ MAX_MESSAGES = 100
 NUM_PARTITIONS = 1
 REPLICATION_FACTOR = 1
 
+
 class SimpleConsumerShellTest(Test):
     """
     Tests SimpleConsumerShell tool
@@ -61,6 +64,7 @@ class SimpleConsumerShellTest(Test):
         self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka,
TOPIC)
         self.simple_consumer_shell.start()
 
+    @cluster(num_nodes=4)
     def test_simple_consumer_shell(self):
         """
         Tests if SimpleConsumerShell is fetching expected records

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 2e21322..9684099 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -16,6 +16,7 @@
 import time
 import math
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.performance import ProducerPerformanceService
@@ -137,6 +138,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                 estimated_throttled_time,
                 time_taken))
 
+    @cluster(num_nodes=10)
     @parametrize(bounce_brokers=False)
     @parametrize(bounce_brokers=True)
     def test_throttled_reassignment(self, bounce_brokers):

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index 15a9696..34af4eb 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
 
 import json
 
@@ -60,10 +61,13 @@ class TestUpgrade(ProduceConsumeValidateTest):
                 node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
             self.kafka.start_node(node)
 
+    @cluster(num_nodes=6)
     @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"],
new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"],
new_consumer=False)
+    @cluster(num_nodes=7)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"],
security_protocol="SASL_SSL")
+    @cluster(num_nodes=6)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"],
new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
@@ -71,6 +75,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9),
compression_types=["snappy"])
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9),
compression_types=["lz4"], new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9),
compression_types=["lz4"])
+    @cluster(num_nodes=7)
     @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"],
new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"],
new_consumer=False)
     def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 0cfdf16..f8b2146 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -92,7 +93,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
             self.kafka.stop_node(node)
             self.kafka.start_node(node)
 
-    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
+    @cluster(num_nodes=9)
+    @matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"])
     def test_zk_security_upgrade(self, security_protocol):
         self.zk.start()
         self.kafka.security_protocol = security_protocol
@@ -103,7 +105,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
             self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
             self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
 
-        if(self.no_sasl):
+        if self.no_sasl:
             self.kafka.start()
         else:
             self.kafka.start(self.zk.zk_principals)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/streams/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py
index d674641..169bbc1 100644
--- a/tests/kafkatest/tests/streams/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_bounce_test.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import ignore
+from ducktape.mark.resource import cluster
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
 import time
 
+
 class StreamsBounceTest(KafkaTest):
     """
     Simple test of Kafka Streams.
@@ -41,6 +42,7 @@ class StreamsBounceTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
+    @cluster(num_nodes=5)
     def test_bounce(self):
         """
         Start a smoke test client, then abort (kill -9) and restart it a few times.
@@ -51,11 +53,11 @@ class StreamsBounceTest(KafkaTest):
 
         self.processor1.start()
 
-        time.sleep(15);
+        time.sleep(15)
 
         self.processor1.abortThenRestart()
 
-        time.sleep(15);
+        time.sleep(15)
 
         # enable this after we add change log partition replicas
         #self.kafka.signal_leader("data")

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/streams/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index ea05c5f..bc84878 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -13,12 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import ignore
+
+from ducktape.mark.resource import cluster
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
 import time
 
+
 class StreamsSmokeTest(KafkaTest):
     """
     Simple test of Kafka Streams.
@@ -45,6 +47,7 @@ class StreamsSmokeTest(KafkaTest):
         self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
     @ignore
+    @cluster(num_nodes=7)
     def test_streams(self):
         """
         Start a few smoke test clients, then repeat start a new one, stop (cleanly) running
one a few times.
@@ -56,14 +59,14 @@ class StreamsSmokeTest(KafkaTest):
         self.processor1.start()
         self.processor2.start()
 
-        time.sleep(15);
+        time.sleep(15)
 
         self.processor3.start()
         self.processor1.stop()
 
-        time.sleep(15);
+        time.sleep(15)
 
-        self.processor4.start();
+        self.processor4.start()
 
         self.driver.wait()
         self.driver.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/tools/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py
index 42cfeea..7e0b9ee 100644
--- a/tests/kafkatest/tests/tools/log4j_appender_test.py
+++ b/tests/kafkatest/tests/tools/log4j_appender_test.py
@@ -17,6 +17,7 @@
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
 from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
@@ -27,6 +28,7 @@ from kafkatest.services.security.security_config import SecurityConfig
 TOPIC = "topic-log4j-appender"
 MAX_MESSAGES = 100
 
+
 class Log4jAppenderTest(Test):
     """
     Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints
to a Kafka topic
@@ -62,7 +64,6 @@ class Log4jAppenderTest(Test):
             self.logger.debug("Received message: %s" % msg)
             self.messages_received_count += 1
 
-
     def start_consumer(self, security_protocol):
         enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka,
topic=TOPIC,
@@ -70,7 +71,10 @@ class Log4jAppenderTest(Test):
                                         message_validator=self.custom_message_validator)
         self.consumer.start()
 
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    @cluster(num_nodes=4)
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    @cluster(num_nodes=5)
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
     def test_log4j_appender(self, security_protocol='PLAINTEXT'):
         """
         Tests if KafkaLog4jAppender is producing to Kafka topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/tools/replica_verification_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py
index 1b625e9..a5b3390 100644
--- a/tests/kafkatest/tests/tools/replica_verification_test.py
+++ b/tests/kafkatest/tests/tools/replica_verification_test.py
@@ -16,8 +16,9 @@
 
 from ducktape.utils.util import wait_until
 from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
+from ducktape.mark.resource import cluster
 
+from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.replica_verification_tool import ReplicaVerificationTool
@@ -59,9 +60,8 @@ class ReplicaVerificationToolTest(Test):
 
     def start_producer(self, max_messages, acks, timeout):
         # This will produce to kafka cluster
+        current_acked = 0
         self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages)
-        current_acked = self.producer.num_acked
-        self.logger.info("current_acked = %s" % current_acked)
         self.producer.start()
         wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages,
timeout_sec=timeout,
                    err_msg="Timeout awaiting messages to be produced and acked")
@@ -69,6 +69,7 @@ class ReplicaVerificationToolTest(Test):
     def stop_producer(self):
         self.producer.stop()
 
+    @cluster(num_nodes=6)
     def test_replica_lags(self, security_protocol='PLAINTEXT'):
         """
         Tests ReplicaVerificationTool
@@ -77,6 +78,7 @@ class ReplicaVerificationToolTest(Test):
         self.start_kafka(security_protocol, security_protocol)
         self.start_replica_verification_tool(security_protocol)
         self.start_producer(max_messages=10, acks=-1, timeout=15)
+
         # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool
         wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10,
                    err_msg="Timed out waiting to reach zero replica lags.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index cae0a3f..e43a4ab 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -50,7 +50,7 @@ setup(name="kafkatest",
       license="apache2.0",
       packages=find_packages(),
       include_package_data=True,
-      install_requires=["ducktape==0.5.3", "requests>=2.5.0"],
+      install_requires=["ducktape==0.6.0", "requests>=2.5.0"],
       tests_require=["pytest", "mock"],
       cmdclass={'test': PyTest},
       )


Mime
View raw message