kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6677: Fixed StreamsConfig producer's max-in-flight allowed when EOS enabled. (#4868)
Date Tue, 24 Apr 2018 18:13:17 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a05e336  KAFKA-6677: Fixed StreamsConfig producer's max-in-flight allowed when EOS
enabled. (#4868)
a05e336 is described below

commit a05e33693b66ac38ccb21f2238c194ca59fcb6ec
Author: Jagadesh Adireddi <adireddijagadesh@gmail.com>
AuthorDate: Tue Apr 24 23:43:12 2018 +0530

    KAFKA-6677: Fixed StreamsConfig producer's max-in-flight allowed when EOS enabled. (#4868)
    
    Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck <bill@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  8 ++++--
 .../apache/kafka/streams/StreamsConfigTest.java    | 30 +++++++++-------------
 2 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 65b1da6..e46d6d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -119,7 +120,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  * <ul>
  *   <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed)
- Consumers will always read committed data only</li>
  *   <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true)
- Producer will always have idempotency enabled</li>
- *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION "max.in.flight.requests.per.connection"}
(1) - Producer will always have one in-flight request per connection</li>
+ *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION "max.in.flight.requests.per.connection"}
(5) - Producer will always have one in-flight request per connection</li>
  * </ul>
  *
  *
@@ -650,7 +651,6 @@ public class StreamsConfig extends AbstractConfig {
         final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
         tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-        tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
1);
 
         PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
@@ -785,6 +785,10 @@ public class StreamsConfig extends AbstractConfig {
         // consumer/producer configurations, log a warning and remove the user defined value
from the Map.
         // Thus the default values for these consumer/producer configurations that are suitable
for
         // Streams will be used instead.
+        final Object maxInflightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+        if (eosEnabled && maxInflightRequests != null && 5 < (int) maxInflightRequests)
{
+            throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+ " can't exceed 5 when using the idempotent producer");
+        }
         for (final String config: nonConfigurableConfigs) {
             if (clientProvidedProps.containsKey(config)) {
                 final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is set to " +
EXACTLY_ONCE + ". Hence, ";
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 87f7075..ef5e5a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -419,23 +419,6 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
{
-        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
-        assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
-    }
-
-    @Test
-    public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled()
{
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
-        assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(2));
-    }
-
-    @Test
     public void shouldSetDifferentDefaultsIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -446,7 +429,6 @@ public class StreamsConfigTest {
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
         assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
         assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
-        assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
         assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
     }
 
@@ -563,6 +545,18 @@ public class StreamsConfigTest {
         }
     }
 
+    @Test
+    public void shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        try {
+            streamsConfig.getProducerConfigs("clientId");
+            fail("Should throw ConfigException when Eos is enabled and maxInFlight requests
exceeds 5");
+        } catch (final ConfigException e) {
+            assertEquals("max.in.flight.requests.per.connection can't exceed 5 when using
the idempotent producer", e.getMessage());
+        }
+    }
 
 
     static class MisconfiguredSerde implements Serde {

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message