This is an automated email from the ASF dual-hosted git repository. boyang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ce939e9 MINOR: Document that max.block.ms affects some transaction methods (#8975) ce939e9 is described below commit ce939e9136b7a1ea5343d464b7660f905212c053 Author: Tom Bentley AuthorDate: Sat Jul 4 18:54:11 2020 +0100 MINOR: Document that max.block.ms affects some transaction methods (#8975) The documentation for max.block.ms said it affected only send() and partitionsFor(), but it actually also affects initTransactions(), abortTransaction() and commitTransaction(). So rework the documentation to cover these methods too. Reviewers: Boyang Chen --- .../org/apache/kafka/clients/producer/ProducerConfig.java | 11 ++++++++--- .../kafka/clients/producer/internals/RecordAccumulator.java | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index dd3c7f7..e4aee39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -156,9 +156,14 @@ public class ProducerConfig extends AbstractConfig { /** max.block.ms */ public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"; - private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block." - + "These methods can be blocked either because the buffer is full or metadata unavailable." - + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout."; + private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long the KafkaProducer's send(), partitionsFor(), " + + "initTransactions(), commitTransaction() " + + "and abortTransaction() methods will block. " + + "For send() this timeout bounds the total time waiting for both metadata fetch and buffer allocation " + + "(blocking in the user-supplied serializers or partitioner is not counted against this timeout). " + + "For partitionsFor() this timeout bounds the time spent waiting for metadata if it is unavailable. " + + "The transaction-related methods always block, but may timeout if " + + "the transaction coordinator could not be discovered or did not respond within the timeout."; /** buffer.memory */ public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 68add17..3781297 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -210,7 +210,7 @@ public final class RecordAccumulator { byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); + log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); buffer = free.allocate(size, maxTimeToBlock); // Update the current time in case the buffer allocation blocked above.