kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
Date Sun, 07 Apr 2019 04:24:53 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 7c214bd  KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
7c214bd is described below

commit 7c214bd64a0b98b439c05c4c95a50faf4b1a95a1
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sat Apr 6 21:08:41 2019 -0700

    KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
    
    Should be cherry-picked to older branches as well.
    
    Reviewers: Bill Bejeck <bbejeck@gmail.com>
---
 .../java/org/apache/kafka/streams/StreamsConfig.java  | 19 +++++++------------
 .../org/apache/kafka/streams/StreamsConfigTest.java   | 12 ++++++++++--
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0795389..f47e0dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -808,22 +808,17 @@ public class StreamsConfig extends AbstractConfig {
 
         // verify that producer batch config is no larger than segment size, then add topic
configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+        final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX,
ProducerConfig.configNames());
 
-        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)))
{
-            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
-            final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX,
ProducerConfig.configNames());
-            final int batchSize;
-            if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
-                batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
-            } else {
-                final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
-                batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
-            }
+        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) &&
+            producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString());
+            final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
 
             if (segmentSize < batchSize) {
                 throw new IllegalArgumentException(String.format("Specified topic segment
size %d is is smaller than the configured producer batch size %d, this will cause produced
batch not able to be appended to the topic",
-                        segmentSize,
-                        batchSize));
+                    segmentSize,
+                    batchSize));
             }
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a873e4c..27d42e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -48,6 +48,7 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.streams.StreamsConfig.topicPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -110,7 +111,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
         props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
-        props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+        props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -124,7 +125,7 @@ public class StreamsConfigTest {
         assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
         assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
         assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
-        assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
+        assertEquals(100, returnedProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
     }
 
     @Test
@@ -565,6 +566,13 @@ public class StreamsConfigTest {
 
 
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testThrowIllegalArgumentExceptionWhenTopicSegmentSizeSmallerThanProducerBatchSize()
{
+        props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+        props.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 101);
+        new StreamsConfig(props).getConsumerConfigs("groupId", "clientId");
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {


Mime
View raw message