kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4719: Consumption timeout should take into account producer request timeout
Date Thu, 02 Feb 2017 18:49:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c8baf2854 -> 76550dd89


KAFKA-4719: Consumption timeout should take into account producer request timeout

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2479 from hachikuji/KAFKA-4719


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76550dd8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76550dd8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76550dd8

Branch: refs/heads/trunk
Commit: 76550dd8956b07f24713f10992fc15aeedabe2c9
Parents: c8baf28
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Feb 2 10:49:11 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Feb 2 10:49:11 2017 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/verifiable_consumer.py | 32 +++++++++++++-------
 tests/kafkatest/services/verifiable_producer.py |  6 +++-
 tests/kafkatest/tests/client/consumer_test.py   |  6 ++--
 .../kafkatest/tests/verifiable_consumer_test.py | 13 +++++---
 4 files changed, 39 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76550dd8/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index d7fca87..0139132 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -55,12 +55,17 @@ class ConsumerEventHandler(object):
                 partition = offset_commit["partition"]
                 tp = TopicPartition(topic, partition)
                 offset = offset_commit["offset"]
-                assert tp in self.assignment, "Committed offsets for a partition not assigned"
-                assert self.position[tp] >= offset, "The committed offset was greater
than the current position"
+                assert tp in self.assignment, \
+                    "Committed offsets for partition %s not assigned (current assignment:
%s)" % \
+                    (str(tp), str(self.assignment))
+                assert self.position[tp] >= offset, \
+                    "The committed offset %d was greater than the current position %d for
partition %s" % \
+                    (offset, self.position[t], str(tp))
                 self.committed[tp] = offset
 
     def handle_records_consumed(self, event):
-        assert self.state == ConsumerState.Joined, "Consumed records should only be received
when joined"
+        assert self.state == ConsumerState.Joined, \
+            "Consumed records should only be received when joined (current state: %s)" %
str(self.state)
 
         for record_batch in event["partitions"]:
             tp = TopicPartition(topic=record_batch["topic"],
@@ -68,9 +73,12 @@ class ConsumerEventHandler(object):
             min_offset = record_batch["minOffset"]
             max_offset = record_batch["maxOffset"]
 
-            assert tp in self.assignment, "Consumed records for a partition not assigned"
+            assert tp in self.assignment, \
+                "Consumed records for partition %s which is not assigned (current assignment:
%s)" % \
+                (str(tp), str(self.assignment))
             assert tp not in self.position or self.position[tp] == min_offset, \
-                "Consumed from an unexpected offset (%s, %s)" % (str(self.position[tp]),
str(min_offset))
+                "Consumed from an unexpected offset (%d, %d) for partition %s" % \
+                (self.position[tp], min_offset, str(tp))
             self.position[tp] = max_offset + 1 
 
         self.total_consumed += event["count"]
@@ -136,9 +144,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
     def __init__(self, context, num_nodes, kafka, topic, group_id,
                  max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
                  assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
-                 version=DEV_BRANCH, stop_timeout_sec=30):
+                 version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO"):
         super(VerifiableConsumer, self).__init__(context, num_nodes)
-        self.log_level = "TRACE"
+        self.log_level = log_level
         
         self.kafka = kafka
         self.topic = topic
@@ -204,13 +212,14 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
             if tp in self.global_committed:
                 # verify that the position never gets behind the current commit.
                 assert self.global_committed[tp] <= consumed_partition["minOffset"], \
-                    "Consumed position %d is behind the current committed offset %d" % (consumed_partition["minOffset"],
self.global_committed[tp])
+                    "Consumed position %d is behind the current committed offset %d for partition
%s" % \
+                    (consumed_partition["minOffset"], self.global_committed[tp], str(tp))
 
             # the consumer cannot generally guarantee that the position increases monotonically
             # without gaps in the face of hard failures, so we only log a warning when this
happens
             if tp in self.global_position and self.global_position[tp] != consumed_partition["minOffset"]:
-                self.logger.warn("Expected next consumed offset of %d, but instead saw %d"
%
-                                 (self.global_position[tp], consumed_partition["minOffset"]))
+                self.logger.warn("Expected next consumed offset of %d for partition %s, but
instead saw %d" %
+                                 (self.global_position[tp], str(tp), consumed_partition["minOffset"]))
 
             self.global_position[tp] = consumed_partition["maxOffset"] + 1
 
@@ -220,7 +229,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
                 tp = TopicPartition(offset_commit["topic"], offset_commit["partition"])
                 offset = offset_commit["offset"]
                 assert self.global_position[tp] >= offset, \
-                    "committed offset is ahead of the current partition"
+                    "Committed offset %d for partition %s is ahead of the current position
%d" % \
+                    (offset, str(tp), self.global_position[tp])
                 self.global_committed[tp] = offset
 
     def start_cmd(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/76550dd8/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index f545634..bc327c7 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -51,7 +51,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
-                 stop_timeout_sec=150):
+                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO"):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There
are
@@ -64,6 +64,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         compression types, one per producer (could be "none").
         """
         super(VerifiableProducer, self).__init__(context, num_nodes)
+        self.log_level = log_level
 
         self.kafka = kafka
         self.topic = topic
@@ -82,6 +83,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         self.clean_shutdown_nodes = set()
         self.acks = acks
         self.stop_timeout_sec = stop_timeout_sec
+        self.request_timeout_sec = request_timeout_sec
 
     def prop_file(self, node):
         idx = self.idx(node)
@@ -109,6 +111,8 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
         if self.acks is not None:
             self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks)
             producer_prop_file += "\nacks=%s\n" % self.acks
+
+        producer_prop_file += "\nrequest.timeout.ms=%d\n" % self.request_timeout_sec
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)

http://git-wip-us.apache.org/repos/asf/kafka/blob/76550dd8/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index ffaabbb..f66021d 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -151,11 +151,13 @@ class OffsetValidationTest(VerifiableConsumerTest):
             # if the total records consumed matches the current position, we haven't seen
any duplicates
             # this can only be guaranteed with a clean shutdown
             assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
+                "Total consumed records %d did not match consumed position %d" % \
+                (consumer.total_consumed(), consumer.current_position(partition))
         else:
             # we may have duplicates in a hard failure
             assert consumer.current_position(partition) <= consumer.total_consumed(),
\
-                "Current position greater than the total number of consumed records"
+                "Current position %d greater than the total number of consumed records %d"
% \
+                (consumer.current_position(partition), consumer.total_consumed())
 
     @cluster(num_nodes=7)
     @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])

http://git-wip-us.apache.org/repos/asf/kafka/blob/76550dd8/tests/kafkatest/tests/verifiable_consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py
index 6417e61..2ba2a61 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -23,14 +23,16 @@ from kafkatest.services.verifiable_consumer import VerifiableConsumer
 from kafkatest.services.kafka import TopicPartition
 
 class VerifiableConsumerTest(KafkaTest):
+    PRODUCER_REQUEST_TIMEOUT_SEC = 30
 
     def __init__(self, test_context, num_consumers=1, num_producers=0,
-                 group_id="test_group_id", session_timeout_sec=10,  **kwargs):
+                 group_id="test_group_id", session_timeout_sec=10, **kwargs):
         super(VerifiableConsumerTest, self).__init__(test_context, **kwargs)
         self.num_consumers = num_consumers
         self.num_producers = num_producers
         self.group_id = group_id
         self.session_timeout_sec = session_timeout_sec
+        self.consumption_timeout_sec = max(self.PRODUCER_REQUEST_TIMEOUT_SEC + 5, 2 * session_timeout_sec)
 
     def _all_partitions(self, topic, num_partitions):
         partitions = set()
@@ -56,11 +58,14 @@ class VerifiableConsumerTest(KafkaTest):
     def setup_consumer(self, topic, enable_autocommit=False, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor"):
         return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
                                   topic, self.group_id, session_timeout_sec=self.session_timeout_sec,
-                                  assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit)
+                                  assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit,
+                                  log_level="TRACE")
 
     def setup_producer(self, topic, max_messages=-1):
         return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,
-                                  max_messages=max_messages, throughput=500)
+                                  max_messages=max_messages, throughput=500,
+                                  request_timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC,
+                                  log_level="DEBUG")
 
     def await_produced_messages(self, producer, min_messages=1000, timeout_sec=10):
         current_acked = producer.num_acked
@@ -70,7 +75,7 @@ class VerifiableConsumerTest(KafkaTest):
     def await_consumed_messages(self, consumer, min_messages=1):
         current_total = consumer.total_consumed()
         wait_until(lambda: consumer.total_consumed() >= current_total + min_messages,
-                   timeout_sec=self.session_timeout_sec*2,
+                   timeout_sec=self.consumption_timeout_sec,
                    err_msg="Timed out waiting for consumption")
 
     def await_members(self, consumer, num_consumers):


Mime
View raw message