kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4965: set internal.leave.group.on.close to false in StreamsConfig
Date Fri, 12 May 2017 23:54:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 05c13552f -> 767cc31a8


KAFKA-4965: set internal.leave.group.on.close to false in StreamsConfig

Backport from trunk

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3032 from dguy/kafka-4965-bp


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/767cc31a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/767cc31a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/767cc31a

Branch: refs/heads/0.10.2
Commit: 767cc31a8929efbcc827a3724700090330f948e6
Parents: 05c1355
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri May 12 16:54:41 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 12 16:54:41 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/StreamsConfig.java     |  2 +-
 .../java/org/apache/kafka/streams/StreamsConfigTest.java |  9 +++++++++
 .../streams/integration/GlobalKTableIntegrationTest.java |  1 +
 .../integration/InternalTopicIntegrationTest.java        |  1 +
 .../kafka/streams/integration/JoinIntegrationTest.java   |  1 +
 .../KStreamAggregationDedupIntegrationTest.java          |  1 +
 .../integration/KStreamAggregationIntegrationTest.java   |  1 +
 .../integration/KStreamKTableJoinIntegrationTest.java    |  1 +
 .../streams/integration/KStreamRepartitionJoinTest.java  |  1 +
 .../KStreamsFineGrainedAutoResetIntegrationTest.java     |  1 +
 .../integration/KTableKTableJoinIntegrationTest.java     |  1 +
 .../integration/QueryableStateIntegrationTest.java       |  2 +-
 .../streams/integration/RegexSourceIntegrationTest.java  | 11 +++++++----
 .../kafka/streams/integration/ResetIntegrationTest.java  |  1 +
 .../streams/integration/utils/IntegrationTestUtils.java  |  1 +
 15 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bc73f24..d9c5f26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -405,6 +405,7 @@ public class StreamsConfig extends AbstractConfig {
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
         // MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle cases when
         // streams is recovering data from state stores. We may set it to Integer.MAX_VALUE
since
         // the streams code itself catches most exceptions and acts accordingly without needing
@@ -412,7 +413,6 @@ public class StreamsConfig extends AbstractConfig {
         // are losing the ability to detect them by setting this value to large. Hopefully
         // deadlocks happen very rarely or never.
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-
         CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f03bed9..15cc1af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,6 +38,7 @@ import java.util.Properties;
 
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -257,6 +259,13 @@ public class StreamsConfigTest {
         streamsConfig.getRestoreConsumerConfigs("client");
     }
 
+    @Test
+    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception
{
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"group", "client");
+        assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index a844a3c..d6a3c90 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -94,6 +94,7 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore);
         stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
         table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 20556c4..be6a5da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -89,6 +89,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
     }
 
     private Properties getTopicConfigProperties(final String changelog) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 0c052f5..fc68516 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -127,6 +127,7 @@ public class JoinIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
             30000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index f2a767c..5e7d02b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -86,6 +86,7 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024
* 1024L);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer,
String>SelectValueMapper();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index f6708a7..ce30212 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -102,6 +102,7 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 0a16494..7ad5710 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -92,6 +92,7 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
             TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 43e5d87..ebe5114 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -105,6 +105,7 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index fe73b73..fc4d218 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -131,6 +131,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
         Properties props = new Properties();
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
                 "testAutoOffsetId",

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 322dd59..3574b21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -139,6 +139,7 @@ public class KTableKTableJoinIntegrationTest {
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 0e4cdc7..64e8459 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -143,7 +143,7 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        // override this to make the rebalances happen quickly
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         stringComparator = new Comparator<KeyValue<String, String>>() {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 0ea36ea..a5ec8db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -111,10 +111,13 @@ public class RegexSourceIntegrationTest {
 
     @Before
     public void setUp() {
-
-        streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(),
-            STRING_SERDE_CLASSNAME,
-            STRING_SERDE_CLASSNAME);
+        final Properties properties = new Properties();
+        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
+                                                                 CLUSTER.bootstrapServers(),
+                                                                 STRING_SERDE_CLASSNAME,
+                                                                 STRING_SERDE_CLASSNAME,
+                                                                 properties);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d653d00..f032f5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -277,6 +277,7 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 08e22cc..38ef64d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -55,6 +55,7 @@ public class IntegrationTestUtils {
 
     public static final int UNLIMITED_MESSAGES = -1;
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
+    public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
     /**
      * Returns up to `maxMessages` message-values from the topic.


Mime
View raw message