kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5032; Update the docs for message size configs across the board
Date Tue, 27 Jun 2017 14:11:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d292cad53 -> dd006c82c


KAFKA-5032; Update the docs for message size configs across the board

Before 0.11, we used to have limits for maximum message size on the producer, broker, and
consumer side.

>From 0.11 onward, these limits apply to record batches as a whole. This patch updates
the documentation of the configs to make this explicit.

A separate patch will have more extensive upgrade notes to tie all the changes together in
one narrative.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3374 from apurvam/KAFKA-5032-message-size-docs

(cherry picked from commit f1cc8008e5e77d0587921d727daff1eafb80f74c)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd006c82
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd006c82
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd006c82

Branch: refs/heads/0.11.0
Commit: dd006c82c2e6a55113cf419b9356ef968949648e
Parents: d292cad
Author: Apurva Mehta <apurva@confluent.io>
Authored: Tue Jun 27 14:24:44 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jun 27 15:11:34 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  | 11 ++++++-----
 .../kafka/clients/producer/ProducerConfig.java  |  7 ++++---
 .../apache/kafka/common/config/TopicConfig.java |  9 ++++++---
 .../main/scala/kafka/server/KafkaConfig.scala   | 20 ++++++++++----------
 4 files changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd006c82/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 01b8c33..9cf3d77 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -124,9 +124,9 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
     private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server
should return for a fetch request. " +
-            "This is not an absolute maximum, if the first message in the first non-empty
partition of the fetch is larger than " +
-            "this value, the message will still be returned to ensure that the consumer can
make progress. " +
-            "The maximum message size accepted by the broker is defined via <code>message.max.bytes</code>
(broker config) or " +
+            "Records are fetched in batches by the consumer, and if the first record batch
in the first non-empty partition of the fetch is larger than " +
+            "this value, the record batch will still be returned to ensure that the consumer
can make progress. As such, this is not a absolute maximum." +
+            "The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code>
(broker config) or " +
             "<code>max.message.bytes</code> (topic config). Note that the consumer
performs multiple fetches in parallel.";
     public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
 
@@ -144,8 +144,9 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
     private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data
per-partition the server " +
-            "will return. If the first message in the first non-empty partition of the fetch
is larger than this limit, the " +
-            "message will still be returned to ensure that the consumer can make progress.
The maximum message size " +
+            "will return. Records are fetched in batches by the consumer. If the first record
batch in the first non-empty " +
+            "partition of the fetch is larger than this limit, the " +
+            "batch will still be returned to ensure that the consumer can make progress.
The maximum record batch size " +
             "accepted by the broker is defined via <code>message.max.bytes</code>
(broker config) or " +
             "<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG
+ " for limiting the consumer request size.";
     public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd006c82/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 2059495..b1f59b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -109,9 +109,10 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>max.request.size</code> */
     public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
-    private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request in
bytes. This is also effectively a cap on the maximum record size. Note that the server "
-                                                       + "has its own cap on record size
which may be different from this. This setting will limit the number of record "
-                                                       + "batches the producer will send
in a single request to avoid sending huge requests.";
+    private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request in
bytes. This setting will limit the number of record "
+                                                       + "batches the producer will send
in a single request to avoid sending huge requests. "
+                                                       + "This is also effectively a cap
on the maximum record batch size. Note that the server "
+                                                       + "has its own cap on record batch
size which may be different from this.";
 
     /** <code>reconnect.backoff.ms</code> */
     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd006c82/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 554c97b..a868f94 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -74,9 +74,12 @@ public class TopicConfig {
         "their data.";
 
     public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
-    public static final String MAX_MESSAGE_BYTES_DOC = "This is largest message size Kafka
will allow to be " +
-        "appended. Note that if you increase this size you must also increase your consumer's
fetch size so " +
-        "they can fetch messages this large.";
+    public static final String MAX_MESSAGE_BYTES_DOC = "<p>The largest record batch
size allowed by Kafka. If this " +
+        "is increased and there are consumers older than 0.10.2, the consumers' fetch size
must also be increased so that " +
+        "the they can fetch record batches this large.</p>" +
+        "<p>In the latest message format version, records are always grouped into batches
for efficiency. In previous " +
+        "message format versions, uncompressed records are not grouped into batches and this
limit only applies to a" +
+        "single record in that case.</p>";
 
     public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
     public static final String INDEX_INTERVAL_BYTES_DOCS = "This setting controls how frequently
" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd006c82/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fe47fd0..b37f2f7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -28,7 +28,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message,
Message
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef.ValidList
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs,
SslConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs,
SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -414,9 +414,8 @@ object KafkaConfig {
   val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be
generated." +
   "To avoid conflicts between zookeeper generated broker id's and user configured broker
id's, generated broker ids " +
   "start from " + MaxReservedBrokerIdProp + " + 1."
-  val MessageMaxBytesDoc = "The maximum message size that the server can receive. Note that
this limit also applies " +
-    "to the total size of a compressed batch of messages (when compression is enabled). Additionally,
in versions " +
-    "0.11 and later, all messages are written as batches and this setting applies to the
total size of the batch."
+  val MessageMaxBytesDoc = TopicConfig.MAX_MESSAGE_BYTES_DOC +
+    s"<p>This can be set per topic with the topic level <code>${TopicConfig.MAX_MESSAGE_BYTES_CONFIG}</code>
config.</p>"
   val NumNetworkThreadsDoc = "The number of threads that the server uses for receiving requests
from the network and sending responses to the network"
   val NumIoThreadsDoc = "The number of threads that the server uses for processing requests,
which may include disk I/O"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing
tasks"
@@ -542,16 +541,17 @@ object KafkaConfig {
   val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should
be at least replica.fetch.wait.max.ms"
   val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests"
   val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch for
each partition. This is not an absolute maximum, " +
-    "if the first message in the first non-empty partition of the fetch is larger than this
value, the message will still be returned " +
-    "to ensure that progress can be made. The maximum message size accepted by the broker
is defined via " +
+    "if the first record batch in the first non-empty partition of the fetch is larger than
this value, the record batch will still be returned " +
+    "to ensure that progress can be made. The maximum record batch size accepted by the broker
is defined via " +
     "<code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code>
(topic config)."
   val ReplicaFetchWaitMaxMsDoc = "max wait time for each fetcher request issued by follower
replicas. This value should always be less than the " +
   "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput
topics"
   val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough
bytes, wait up to replicaMaxWaitTimeMs"
-  val ReplicaFetchResponseMaxBytesDoc = "Maximum bytes expected for the entire fetch response.
This is not an absolute maximum, " +
-    "if the first message in the first non-empty partition of the fetch is larger than this
value, the message will still be returned " +
-    "to ensure that progress can be made. The maximum message size accepted by the broker
is defined via " +
-    "<code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code>
(topic config)."
+  val ReplicaFetchResponseMaxBytesDoc = "Maximum bytes expected for the entire fetch response.
Records are fetched in batches, " +
+    "and if the first record batch in the first non-empty partition of the fetch is larger
than this value, the record batch " +
+    "will still be returned to ensure that progress can be made. As such, this is not an
absolute maximum. The maximum " +
+    "record batch size accepted by the broker is defined via <code>message.max.bytes</code>
(broker config) or " +
+    "<code>max.message.bytes</code> (topic config)."
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from
a source broker. " +
   "Increasing this value can increase the degree of I/O parallelism in the follower broker."
   val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error
occurs."


Mime
View raw message