kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
Date Sun, 07 Apr 2019 04:12:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 28f0893  KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
28f0893 is described below

commit 28f089382b740a9b0e2b0fb3711c9cd5353a2965
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sat Apr 6 21:08:41 2019 -0700

    KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
    
    Should be cherry-picked to older branches as well.
    
    Reviewers: Bill Bejeck <bbejeck@gmail.com>
---
 .../java/org/apache/kafka/streams/StreamsConfig.java  | 19 +++++++------------
 .../org/apache/kafka/streams/StreamsConfigTest.java   | 12 ++++++++++--
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5b76a18..430403a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -988,22 +988,17 @@ public class StreamsConfig extends AbstractConfig {
 
         // verify that producer batch config is no larger than segment size, then add topic
configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+        final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX,
ProducerConfig.configNames());
 
-        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)))
{
-            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
-            final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX,
ProducerConfig.configNames());
-            final int batchSize;
-            if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
-                batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
-            } else {
-                final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
-                batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
-            }
+        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) &&
+            producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString());
+            final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
 
             if (segmentSize < batchSize) {
                 throw new IllegalArgumentException(String.format("Specified topic segment
size %d is is smaller than the configured producer batch size %d, this will cause produced
batch not able to be appended to the topic",
-                        segmentSize,
-                        batchSize));
+                    segmentSize,
+                    batchSize));
             }
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 7a837c0..05995fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.streams.StreamsConfig.topicPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -111,7 +112,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
         props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
-        props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+        props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -125,7 +126,7 @@ public class StreamsConfigTest {
         assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
         assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
         assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
-        assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
+        assertEquals(100, returnedProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
     }
 
     @Test
@@ -629,6 +630,13 @@ public class StreamsConfigTest {
         new StreamsConfig(props);
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testThrowIllegalArgumentExceptionWhenTopicSegmentSizeSmallerThanProducerBatchSize()
{
+        props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+        props.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 101);
+        new StreamsConfig(props).getMainConsumerConfigs("groupId", "clientId");
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {


Mime
View raw message