kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)
Date Tue, 08 Jan 2019 14:15:16 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9a22f4  KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)
f9a22f4 is described below

commit f9a22f42a842b2d80ac687567ae04e91dadebefe
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Jan 8 06:14:51 2019 -0800

    KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)
    
    This commit creates an EndToEndTest base class which relies on the verifiable consumer.
This will ultimately replace ProduceConsumeValidateTest which depends on the console consumer.
The advantage is that the verifiable consumer exposes more information to use for validation.
It also allows for a nicer shutdown pattern. Rather than relying on the console consumer idle
timeout, which requires a minimum wait time, we can halt consumption after we have reached
the last acked offsets. Thi [...]
---
 tests/kafkatest/services/kafka/kafka.py            |   4 +
 .../services/kafka/templates/kafka.properties      |   1 +
 tests/kafkatest/services/verifiable_consumer.py    |   8 +-
 tests/kafkatest/services/verifiable_producer.py    |   9 ++
 tests/kafkatest/tests/core/replication_test.py     |  64 ++++-----
 tests/kafkatest/tests/core/security_test.py        |  59 +++-----
 tests/kafkatest/tests/end_to_end.py                | 151 +++++++++++++++++++++
 tests/kafkatest/tests/produce_consume_validate.py  |  68 ++--------
 tests/kafkatest/utils/__init__.py                  |   2 +-
 tests/kafkatest/utils/util.py                      |  56 ++++++++
 10 files changed, 290 insertions(+), 132 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 75ec993..2258e27 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -532,6 +532,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         return missing
 
+    def restart_cluster(self, clean_shutdown=True):
+        for node in self.nodes:
+            self.restart_node(node, clean_shutdown=clean_shutdown)
+
     def restart_node(self, node, clean_shutdown=True):
         """Restart the given node."""
         self.stop_node(node, clean_shutdown)
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index dd777f9..4362978 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -50,6 +50,7 @@ replica.lag.time.max.ms={{replica_lag}}
 {% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %}
 auto.create.topics.enable={{ auto_create_topics_enable }}
 {% endif %}
+offsets.topic.num.partitions={{ num_nodes }}
 offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
 # Set to a low, but non-zero value to exercise this path without making tests much slower
 group.initial.rebalance.delay.ms=100
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index a48afcf..6e5d50e 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -161,7 +161,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
     def __init__(self, context, num_nodes, kafka, topic, group_id,
                  max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
                  assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
-                 version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None):
+                 version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
+                 on_record_consumed=None):
         """
         :param jaas_override_variables: A dict of variables to be used in the jaas.conf template
file
         """
@@ -177,6 +178,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         self.assignment_strategy = assignment_strategy
         self.prop_file = ""
         self.stop_timeout_sec = stop_timeout_sec
+        self.on_record_consumed = on_record_consumed
 
         self.event_handlers = {}
         self.global_position = {}
@@ -228,6 +230,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
                     elif name == "records_consumed":
                         handler.handle_records_consumed(event)
                         self._update_global_position(event, node)
+                    elif name == "record_data" and self.on_record_consumed:
+                        self.on_record_consumed(event, node)
                     elif name == "partitions_revoked":
                         handler.handle_partitions_revoked(event)
                     elif name == "partitions_assigned":
@@ -268,6 +272,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
         cmd += self.impl.exec_cmd(node)
+        if self.on_record_consumed:
+            cmd += " --verbose"
         cmd += " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy
%s %s" % \
                (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
                self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit"
if self.enable_autocommit else "")
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 744524e..3322d16 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -20,6 +20,7 @@ import time
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.services.background_thread import BackgroundThreadService
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import DEV_BRANCH
@@ -90,6 +91,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         for node in self.nodes:
             node.version = version
         self.acked_values = []
+        self._last_acked_offsets = {}
         self.not_acked_values = []
         self.produced_count = {}
         self.clean_shutdown_nodes = set()
@@ -175,7 +177,9 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
                         self.produced_count[idx] += 1
 
                     elif data["name"] == "producer_send_success":
+                        partition = TopicPartition(data["topic"], data["partition"])
                         self.acked_values.append(self.message_validator(data["value"]))
+                        self._last_acked_offsets[partition] = data["offset"]
                         self.produced_count[idx] += 1
 
                         # Log information if there is a large gap between successively acknowledged
messages
@@ -242,6 +246,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         return len(self.pids(node)) > 0
 
     @property
+    def last_acked_offsets(self):
+        with self.lock:
+            return self._last_acked_offsets
+
+    @property
     def acked(self):
         with self.lock:
             return self.acked_values
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index c16d679..f5c6422 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -19,12 +19,7 @@ from ducktape.mark import matrix
 from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
+from kafkatest.tests.end_to_end import EndToEndTest
 
 import signal
 
@@ -83,7 +78,7 @@ failures = {
 }
 
 
-class ReplicationTest(ProduceConsumeValidateTest):
+class ReplicationTest(EndToEndTest):
     """
     Note that consuming is a bit tricky, at least with console consumer. The goal is to consume
all messages
     (foreach partition) in the topic. In this case, waiting for the last message may cause
the consumer to stop
@@ -98,25 +93,16 @@ class ReplicationTest(ProduceConsumeValidateTest):
     indicator that nothing is left to consume.
     """
 
+    TOPIC_CONFIG = {
+        "partitions": 3,
+        "replication-factor": 3,
+        "configs": {"min.insync.replicas": 2}
+    }
+ 
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReplicationTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
-                                  topics={self.topic: {
-                                      "partitions": 3,
-                                      "replication-factor": 3,
-                                      'configs': {"min.insync.replicas": 2}}
-                                  })
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
+        super(ReplicationTest, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG)
+ 
     def min_cluster_size(self):
         """Override this since we're adding services outside of the constructor"""
         return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@@ -156,15 +142,23 @@ class ReplicationTest(ProduceConsumeValidateTest):
             - Validate that every acked message was consumed
         """
 
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        self.kafka.client_sasl_mechanism = client_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
-        self.enable_idempotence = enable_idempotence
-        compression_types = None if not compression_type else [compression_type] * self.num_producers
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic,
-                                           throughput=self.producer_throughput, compression_types=compression_types,
-                                           enable_idempotence=enable_idempotence)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, consumer_timeout_ms=60000, message_validator=is_int)
+        self.create_zookeeper()
+        self.zk.start()
+
+        self.create_kafka(num_nodes=3,
+                          security_protocol=security_protocol,
+                          interbroker_security_protocol=security_protocol,
+                          client_sasl_mechanism=client_sasl_mechanism,
+                          interbroker_sasl_mechanism=interbroker_sasl_mechanism)
         self.kafka.start()
-        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self,
broker_type))
+
+        compression_types = None if not compression_type else [compression_type]
+        self.create_producer(compression_types=compression_types, enable_idempotence=enable_idempotence)
+        self.producer.start()
+
+        self.create_consumer(log_level="DEBUG")
+        self.consumer.start()
+
+        self.await_startup()
+        failures[failure_mode](self, broker_type)
+        self.run_validation(enable_idempotence=enable_idempotence)
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 4edbcff..d0bebfe 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -19,14 +19,9 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from ducktape.errors import TimeoutError
 
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.security.security_config import SslStores
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
+from kafkatest.tests.end_to_end import EndToEndTest
 
 class TestSslStores(SslStores):
     def __init__(self, local_scratch_dir, valid_hostname=True):
@@ -41,7 +36,7 @@ class TestSslStores(SslStores):
         else:
             return "invalidhostname"
 
-class SecurityTest(ProduceConsumeValidateTest):
+class SecurityTest(EndToEndTest):
     """
     These tests validate security features.
     """
@@ -50,21 +45,6 @@ class SecurityTest(ProduceConsumeValidateTest):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(SecurityTest, self).__init__(test_context=test_context)
 
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic:
{
-                                                                    "partitions": 2,
-                                                                    "replication-factor":
1}
-                                                                })
-        self.num_partitions = 2
-        self.timeout_sec = 10000
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
     def producer_consumer_have_expected_error(self, error):
         try:
             for node in self.producer.nodes:
@@ -87,16 +67,19 @@ class SecurityTest(ProduceConsumeValidateTest):
         with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
         """
 
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = interbroker_security_protocol
-        SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False)
+        SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
+                                                  valid_hostname=False)
+
+        self.create_zookeeper()
+        self.zk.start()
 
+        self.create_kafka(security_protocol=security_protocol,
+                          interbroker_security_protocol=interbroker_security_protocol)
         self.kafka.start()
-        self.create_producer_and_consumer()
-        self.producer.log_level = "TRACE"
 
-        self.producer.start()
-        self.consumer.start()
+        # We need more verbose logging to catch the expected errors
+        self.create_and_start_clients(log_level="DEBUG")
+
         try:
             wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5)
 
@@ -109,19 +92,17 @@ class SecurityTest(ProduceConsumeValidateTest):
 
         error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
         wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5)
-
         self.producer.stop()
         self.consumer.stop()
-        self.producer.log_level = "INFO"
 
         SecurityConfig.ssl_stores.valid_hostname = True
-        for node in self.kafka.nodes:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate()
+        self.kafka.restart_cluster()
+        self.create_and_start_clients(log_level="INFO")
+        self.run_validation()
 
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, consumer_timeout_ms=10000, message_validator=is_int)
+    def create_and_start_clients(self, log_level):
+        self.create_producer(log_level=log_level)
+        self.producer.start()
 
+        self.create_consumer(log_level=log_level)
+        self.consumer.start()
diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py
new file mode 100644
index 0000000..9cc6b41
--- /dev/null
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.utils import validate_delivery
+
+import time
+
+class EndToEndTest(Test):
+    """This class provides a shared template for tests which follow the common pattern of:
+
+        - produce to a topic in the background
+        - consume from that topic in the background
+        - run some logic, e.g. fail topic leader etc.
+        - perform validation
+    """
+
+    DEFAULT_TOPIC_CONFIG = {"partitions": 2, "replication-factor": 1}
+
+    def __init__(self, test_context, topic="test_topic", topic_config=DEFAULT_TOPIC_CONFIG):
+        super(EndToEndTest, self).__init__(test_context=test_context)
+        self.topic = topic
+        self.topic_config = topic_config
+        self.records_consumed = []
+        self.last_consumed_offsets = {}
+        
+    def create_zookeeper(self, num_nodes=1, **kwargs):
+        self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs)
+
+    def create_kafka(self, num_nodes=1, **kwargs):
+        group_metadata_config = {
+            "partitions": num_nodes,
+            "replication-factor": min(num_nodes, 3),
+            "configs": {"cleanup.policy": "compact"}
+        }
+
+        topics = {
+            self.topic: self.topic_config,
+            "__consumer_offsets": group_metadata_config
+        }
+        self.kafka = KafkaService(self.test_context, num_nodes=num_nodes,
+                                  zk=self.zk, topics=topics, **kwargs)
+
+    def create_consumer(self, num_nodes=1, group_id="test_group", **kwargs):
+        self.consumer = VerifiableConsumer(self.test_context,
+                                           num_nodes=num_nodes,
+                                           kafka=self.kafka,
+                                           topic=self.topic,
+                                           group_id=group_id,
+                                           on_record_consumed=self.on_record_consumed,
+                                           **kwargs)
+                                    
+
+    def create_producer(self, num_nodes=1, throughput=1000, **kwargs):
+        self.producer = VerifiableProducer(self.test_context,
+                                           num_nodes=num_nodes,
+                                           kafka=self.kafka,
+                                           topic=self.topic,
+                                           throughput=throughput,
+                                           **kwargs)
+
+    def on_record_consumed(self, record, node):
+        partition = TopicPartition(record["topic"], record["partition"])
+        record_id = int(record["value"])
+        offset = record["offset"]
+        self.last_consumed_offsets[partition] = offset
+        self.records_consumed.append(record_id)
+
+    def await_consumed_offsets(self, last_acked_offsets, timeout_sec):
+        def has_finished_consuming():
+            for partition, offset in last_acked_offsets.iteritems():
+                if not partition in self.last_consumed_offsets:
+                    return False
+                if self.last_consumed_offsets[partition] < offset:
+                    return False
+            return True
+
+        wait_until(has_finished_consuming,
+                   timeout_sec=timeout_sec,
+                   err_msg="Consumer failed to consume up to offsets %s after waiting %ds."
%\
+                   (str(last_acked_offsets), timeout_sec))
+
+
+    def _collect_all_logs(self):
+        for s in self.test_context.services:
+            self.mark_for_collect(s)
+
+    def await_startup(self, min_records=5, timeout_sec=30):
+        try:
+            wait_until(lambda: self.consumer.total_consumed() >= min_records,
+                       timeout_sec=timeout_sec,
+                       err_msg="Timed out after %ds while awaiting initial record delivery
of %d records" %\
+                       (timeout_sec, min_records))
+        except BaseException:
+            self._collect_all_logs()
+            raise
+
+    def run_validation(self, min_records=5000, producer_timeout_sec=30,
+                       consumer_timeout_sec=30, enable_idempotence=False):
+        try:
+            wait_until(lambda: self.producer.num_acked > min_records,
+                       timeout_sec=producer_timeout_sec,
+                       err_msg="Producer failed to produce messages for %ds." %\
+                       producer_timeout_sec)
+
+            self.logger.info("Stopping producer after writing up to offsets %s" %\
+                         str(self.producer.last_acked_offsets))
+            self.producer.stop()
+
+            self.await_consumed_offsets(self.producer.last_acked_offsets, consumer_timeout_sec)
+            self.consumer.stop()
+            
+            self.validate(enable_idempotence)
+        except BaseException:
+            self._collect_all_logs()
+            raise
+
+    def validate(self, enable_idempotence):
+        self.logger.info("Number of acked records: %d" % len(self.producer.acked))
+        self.logger.info("Number of consumed records: %d" % len(self.records_consumed))
+
+        def check_lost_data(missing_records):
+            return self.kafka.search_data_files(self.topic, missing_records)
+
+        succeeded, error_msg = validate_delivery(self.producer.acked, self.records_consumed,
+                                                 enable_idempotence, check_lost_data)
+
+        # Collect all logs if validation fails
+        if not succeeded:
+            self._collect_all_logs()
+
+        assert succeeded, error_msg
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 6979b94..0d2b20c 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -15,6 +15,9 @@
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
+
+from kafkatest.utils import validate_delivery
+
 import time
 
 class ProduceConsumeValidateTest(Test):
@@ -112,68 +115,21 @@ class ProduceConsumeValidateTest(Test):
                 self.mark_for_collect(s)
             raise
 
-    @staticmethod
-    def annotate_missing_msgs(missing, acked, consumed, msg):
-        missing_list = list(missing)
-        msg += "%s acked message did not make it to the Consumer. They are: " %\
-            len(missing_list)
-        if len(missing_list) < 20:
-            msg += str(missing_list) + ". "
-        else:
-            msg += ", ".join(str(m) for m in missing_list[:20])
-            msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
-                   % (len(missing_list) - 20, len(set(acked)), len(set(consumed)))
-        return msg
-
-    @staticmethod
-    def annotate_data_lost(data_lost, msg, number_validated):
-        print_limit = 10
-        if len(data_lost) > 0:
-            msg += "The first %s missing messages were validated to ensure they are in Kafka's
data files. " \
-                   "%s were missing. This suggests data loss. Here are some of the messages
not found in the data files: %s\n" \
-                   % (number_validated, len(data_lost), str(data_lost[0:print_limit]) if
len(data_lost) > print_limit else str(data_lost))
-        else:
-            msg += "We validated that the first %s of these missing messages correctly made
it into Kafka's data files. " \
-                   "This suggests they were lost on their way to the consumer." % number_validated
-        return msg
-
     def validate(self):
-        """Check that each acked message was consumed."""
-        success = True
-        msg = ""
-        acked = self.producer.acked
-        consumed = self.consumer.messages_consumed[1]
-        # Correctness of the set difference operation depends on using equivalent message_validators
in procuder and consumer
-        missing = set(acked) - set(consumed)
-
-        self.logger.info("num consumed:  %d" % len(consumed))
-
-        # Were all acked messages consumed?
-        if len(missing) > 0:
-            msg = self.annotate_missing_msgs(missing, acked, consumed, msg)
-            success = False
-
-            #Did we miss anything due to data loss?
-            to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
-            data_lost = self.kafka.search_data_files(self.topic, to_validate)
-            msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
+        messages_consumed = self.consumer.messages_consumed[1]
 
+        self.logger.info("Number of acked records: %d" % len(self.producer.acked))
+        self.logger.info("Number of consumed records: %d" % len(messages_consumed))
 
-        if self.enable_idempotence:
-            self.logger.info("Ran a test with idempotence enabled. We expect no duplicates")
-        else:
-            self.logger.info("Ran a test with idempotence disabled.")
+        def check_lost_data(missing_records):
+            return self.kafka.search_data_files(self.topic, missing_records)
 
-        # Are there duplicates?
-        if len(set(consumed)) != len(consumed):
-            num_duplicates = abs(len(set(consumed)) - len(consumed))
-            msg += "(There are also %s duplicate messages in the log - but that is an acceptable
outcome)\n" % num_duplicates
-            if self.enable_idempotence:
-                assert False, "Detected %s duplicates even though idempotence was enabled."
% num_duplicates
+        succeeded, error_msg = validate_delivery(self.producer.acked, messages_consumed,
+                                                 self.enable_idempotence, check_lost_data)
 
         # Collect all logs if validation fails
-        if not success:
+        if not succeeded:
             for s in self.test_context.services:
                 self.mark_for_collect(s)
 
-        assert success, msg
+        assert succeeded, error_msg
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
index 8c473bf..1c1d5e0 100644
--- a/tests/kafkatest/utils/__init__.py
+++ b/tests/kafkatest/utils/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable
+from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable,
validate_delivery
diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py
index dd20273..b9ccaf8 100644
--- a/tests/kafkatest/utils/util.py
+++ b/tests/kafkatest/utils/util.py
@@ -112,3 +112,59 @@ def node_is_reachable(src_node, dst_node):
     :return:                True only if dst is reachable from src.
     """
     return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True)
+
+
+def annotate_missing_msgs(missing, acked, consumed, msg):
+    missing_list = list(missing)
+    msg += "%s acked message did not make it to the Consumer. They are: " %\
+        len(missing_list)
+    if len(missing_list) < 20:
+        msg += str(missing_list) + ". "
+    else:
+        msg += ", ".join(str(m) for m in missing_list[:20])
+        msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
+            % (len(missing_list) - 20, len(set(acked)), len(set(consumed)))
+    return msg
+
+def annotate_data_lost(data_lost, msg, number_validated):
+    print_limit = 10
+    if len(data_lost) > 0:
+        msg += "The first %s missing messages were validated to ensure they are in Kafka's
data files. " \
+            "%s were missing. This suggests data loss. Here are some of the messages not
found in the data files: %s\n" \
+            % (number_validated, len(data_lost), str(data_lost[0:print_limit]) if len(data_lost)
> print_limit else str(data_lost))
+    else:
+        msg += "We validated that the first %s of these missing messages correctly made it
into Kafka's data files. " \
+            "This suggests they were lost on their way to the consumer." % number_validated
+    return msg
+
+def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_data=None):
+    """Check that each acked message was consumed."""
+    success = True
+    msg = ""
+
+    # Correctness of the set difference operation depends on using equivalent
+    # message_validators in producer and consumer
+    missing = set(acked) - set(consumed)
+    
+    # Were all acked messages consumed?
+    if len(missing) > 0:
+        msg = annotate_missing_msgs(missing, acked, consumed, msg)
+        success = False
+        
+        # Did we miss anything due to data loss?
+        if check_lost_data:
+            to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
+            data_lost = check_lost_data(to_validate)
+            msg = annotate_data_lost(data_lost, msg, len(to_validate))
+
+    # Are there duplicates?
+    if len(set(consumed)) != len(consumed):
+        num_duplicates = abs(len(set(consumed)) - len(consumed))
+
+        if idempotence_enabled:
+            success = False
+            msg += "Detected %d duplicates even though idempotence was enabled.\n" % num_duplicates
+        else:
+            msg += "(There are also %d duplicate messages in the log - but that is an acceptable
outcome)\n" % num_duplicates
+
+    return success, msg


Mime
View raw message