kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: [KAFKA-7379] [streams] send.buffer.bytes should be allowed to set -1 in KafkaStreams (#5643)
Date Thu, 20 Sep 2018 19:20:17 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7bc1019  [KAFKA-7379] [streams] send.buffer.bytes should be allowed to set -1 in
KafkaStreams (#5643)
7bc1019 is described below

commit 7bc1019d4bdf2b9592a7fc3e08659aae7961046b
Author: Aleksei Izmalkin <42375035+aai95@users.noreply.github.com>
AuthorDate: Thu Sep 20 23:20:03 2018 +0400

    [KAFKA-7379] [streams] send.buffer.bytes should be allowed to set -1 in KafkaStreams (#5643)
    
    What changes were proposed in this pull request?
    atLeast(0) in StreamsConfig, ProducerConfig and ConsumerConfig were replaced by SEND_BUFFER_LOWER_BOUND
and RECEIVE_BUFFER_LOWER_BOUND from CommonClientConfigs.
    
    How was this patch tested?
    Three unit tests were added to KafkaStreamsTest
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>,
Matthias J. Sax <mjsax@apache.org>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |  2 ++
 .../kafka/clients/consumer/ConsumerConfig.java     |  4 ++--
 .../kafka/clients/producer/ProducerConfig.java     |  4 ++--
 .../org/apache/kafka/streams/StreamsConfig.java    |  4 ++--
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 25 ++++++++++++++++++++++
 5 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 7da7a60..a08e5b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -46,9 +46,11 @@ public class CommonClientConfigs {
 
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
     public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF)
to use when sending data. If the value is -1, the OS default will be used.";
+    public static final int SEND_BUFFER_LOWER_BOUND = -1;
 
     public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
     public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF)
to use when reading data. If the value is -1, the OS default will be used.";
+    public static final int RECEIVE_BUFFER_LOWER_BOUND = -1;
 
     public static final String CLIENT_ID_CONFIG = "client.id";
     public static final String CLIENT_ID_DOC = "An id string to pass to the server when making
requests. The purpose of this is to be able to track the source of requests beyond just ip/port
by allowing a logical application name to be included in server-side request logging.";
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1fcabca..ddd6e06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -313,13 +313,13 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(SEND_BUFFER_CONFIG,
                                         Type.INT,
                                         128 * 1024,
-                                        atLeast(-1),
+                                        atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.SEND_BUFFER_DOC)
                                 .define(RECEIVE_BUFFER_CONFIG,
                                         Type.INT,
                                         64 * 1024,
-                                        atLeast(-1),
+                                        atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(FETCH_MIN_BYTES_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 0a55303..1a1bab5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -251,8 +251,8 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM,
LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000,
atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM,
CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1),
Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
-                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1),
Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(MAX_REQUEST_SIZE_CONFIG,
                                         Type.INT,
                                         1 * 1024 * 1024,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 736e9cb..20f2f69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -630,7 +630,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(RECEIVE_BUFFER_CONFIG,
                     Type.INT,
                     32 * 1024,
-                    atLeast(0),
+                    atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
                     Importance.LOW,
                     CommonClientConfigs.RECEIVE_BUFFER_DOC)
             .define(RECONNECT_BACKOFF_MS_CONFIG,
@@ -671,7 +671,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(SEND_BUFFER_CONFIG,
                     Type.INT,
                     128 * 1024,
-                    atLeast(0),
+                    atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
                     Importance.LOW,
                     CommonClientConfigs.SEND_BUFFER_DOC)
             .define(STATE_CLEANUP_DELAY_MS_CONFIG,
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index dd5cff7..5e07703 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -96,6 +99,28 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void testOsDefaultSocketBufferSizes() {
+        props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.close();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketSendBufferSize() {
+        props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, -2);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.close();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketReceiveBufferSize() {
+        props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, -2);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.close();
+    }
+
+    @Test
     public void testStateChanges() throws InterruptedException {
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);


Mime
View raw message