kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6774; Improve the default group id behavior in KafkaConsumer (KIP-289) (#5877)
Date Fri, 16 Nov 2018 08:59:07 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c3e7d62  KAFKA-6774; Improve the default group id behavior in KafkaConsumer (KIP-289)
(#5877)
c3e7d62 is described below

commit c3e7d6252c41d48e74379810595c978efada9efb
Author: Vahid Hashemian <vahid.hashemian@gmail.com>
AuthorDate: Fri Nov 16 00:58:56 2018 -0800

    KAFKA-6774; Improve the default group id behavior in KafkaConsumer (KIP-289) (#5877)
    
    Improve the default group id behavior by:
    * changing the default consumer group to null, where no offset commit or fetch, or group
management operations are allowed
    * deprecating the use of empty (`""`) consumer group on the client
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      | 109 ++++++++++-------
 .../consumer/internals/AbstractCoordinator.java    |   5 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 134 +++++++++++++++++----
 .../ClientAuthenticationFailureTest.java           |   1 +
 .../kafka/api/IntegrationTestHarness.scala         |   4 +-
 .../kafka/api/PlaintextConsumerTest.scala          | 128 +++++++++++++++++++-
 docs/upgrade.html                                  |   9 ++
 8 files changed, 318 insertions(+), 74 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 795a762..9cd5766 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -277,7 +277,7 @@ public class ConsumerConfig extends AbstractConfig {
                                            ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
-                                .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH,
GROUP_ID_DOC)
+                                .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH,
GROUP_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         10000,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3a75672..5c673a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -557,6 +559,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
     private final Logger log;
     private final String clientId;
+    private String groupId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -654,18 +657,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     }
 
     @SuppressWarnings("unchecked")
-    private KafkaConsumer(ConsumerConfig config,
-                          Deserializer<K> keyDeserializer,
-                          Deserializer<V> valueDeserializer) {
+    private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V>
valueDeserializer) {
         try {
             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
             if (clientId.isEmpty())
                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
-            String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-
+            this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
             LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ",
groupId=" + groupId + "] ");
             this.log = logContext.logger(getClass());
+            boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            if (groupId == null) { // overwrite in case of default group id where the config
is not explicitly provided
+                if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+                    enableAutoCommit = false;
+                else if (enableAutoCommit)
+                    throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ " cannot be set to true when default group id (null) is used.");
+            } else if (groupId.isEmpty())
+                log.warn("Support for using the empty group id by consumers is deprecated
and will be removed in the next major release.");
 
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -678,8 +686,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricsTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class,
-                    Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
+                    MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -691,16 +698,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     ConsumerInterceptor.class);
             this.interceptors = new ConsumerInterceptors<>(interceptorList);
             if (keyDeserializer == null) {
-                this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                        Deserializer.class);
+                this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
                 this.keyDeserializer.configure(config.originals(), true);
             } else {
                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                 this.keyDeserializer = keyDeserializer;
             }
             if (valueDeserializer == null) {
-                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                        Deserializer.class);
+                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
                 this.valueDeserializer.configure(config.originals(), false);
             } else {
                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
@@ -710,17 +715,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     true, false, clusterResourceListeners);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
-                    config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-                    config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
+                    config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
             this.metadata.bootstrap(addresses, time.milliseconds());
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
-
             IsolationLevel isolationLevel = IsolationLevel.valueOf(
                     config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
             Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
-
             int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
 
             NetworkClient netClient = new NetworkClient(
@@ -755,24 +757,26 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
             int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
             int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
-            this.coordinator = new ConsumerCoordinator(logContext,
-                    this.client,
-                    groupId,
-                    maxPollIntervalMs,
-                    sessionTimeoutMs,
-                    new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs,
retryBackoffMs),
-                    assignors,
-                    this.metadata,
-                    this.subscriptions,
-                    metrics,
-                    metricGrpPrefix,
-                    this.time,
-                    retryBackoffMs,
-                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                    this.interceptors,
-                    config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
-                    config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+            // no coordinator will be constructed for the default (null) group id
+            this.coordinator = groupId == null ? null :
+                new ConsumerCoordinator(logContext,
+                        this.client,
+                        groupId,
+                        maxPollIntervalMs,
+                        sessionTimeoutMs,
+                        new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs,
retryBackoffMs),
+                        assignors,
+                        this.metadata,
+                        this.subscriptions,
+                        metrics,
+                        metricGrpPrefix,
+                        this.time,
+                        retryBackoffMs,
+                        enableAutoCommit,
+                        config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                        this.interceptors,
+                        config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+                        config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
             this.fetcher = new Fetcher<>(
                     logContext,
                     this.client,
@@ -795,11 +799,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
-
             log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
-            // call close methods if internal objects are already constructed
-            // this is to prevent resource leak. see KAFKA-2121
+            // call close methods if internal objects are already constructed; this is to
prevent resource leak. see KAFKA-2121
             close(0, true);
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka consumer", t);
@@ -822,7 +824,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                   long retryBackoffMs,
                   long requestTimeoutMs,
                   int defaultApiTimeoutMs,
-                  List<PartitionAssignor> assignors) {
+                  List<PartitionAssignor> assignors,
+                  String groupId) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
         this.coordinator = coordinator;
@@ -839,6 +842,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         this.requestTimeoutMs = requestTimeoutMs;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.assignors = assignors;
+        this.groupId = groupId;
     }
 
     /**
@@ -911,9 +915,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
{
         acquireAndEnsureOpen();
         try {
-            if (topics == null) {
+            maybeThrowInvalidGroupIdException();
+            if (topics == null)
                 throw new IllegalArgumentException("Topic collection to subscribe to cannot
be null");
-            } else if (topics.isEmpty()) {
+            if (topics.isEmpty()) {
                 // treat subscribing to empty topic list as the same as unsubscribing
                 this.unsubscribe();
             } else {
@@ -980,6 +985,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+        maybeThrowInvalidGroupIdException();
         if (pattern == null)
             throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
 
@@ -1026,7 +1032,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         try {
             fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
             this.subscriptions.unsubscribe();
-            this.coordinator.maybeLeaveGroup();
+            if (this.coordinator != null)
+                this.coordinator.maybeLeaveGroup();
             this.metadata.needMetadataForAllTopics(false);
             log.info("Unsubscribed all topics or patterns and assigned partitions");
         } finally {
@@ -1073,7 +1080,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
                 // make sure the offsets of topic partitions the consumer is unsubscribing
from
                 // are committed since there will be no following rebalance
-                this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+                if (coordinator != null)
+                    this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
 
                 log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
                 this.subscriptions.assignFromUser(new HashSet<>(partitions));
@@ -1211,7 +1219,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * Visible for testing
      */
     boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
-        if (!coordinator.poll(timer)) {
+        if (coordinator != null && !coordinator.poll(timer)) {
             return false;
         }
 
@@ -1219,7 +1227,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     }
 
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer
timer) {
-        long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
+        long pollTimeout = coordinator == null ? timer.remainingMs() :
+                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
 
         // if data is available already, return it immediately
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
fetcher.fetchedRecords();
@@ -1249,7 +1258,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
         // after the long poll, we should check whether the group needs to rebalance
         // prior to returning data so that the group can stabilize faster
-        if (coordinator.rejoinNeededOrPending()) {
+        if (coordinator != null && coordinator.rejoinNeededOrPending()) {
             return Collections.emptyMap();
         }
 
@@ -1324,6 +1333,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void commitSync(Duration timeout) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout)))
{
                 throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired
before successfully " +
                         "committing the current consumed offsets");
@@ -1406,6 +1416,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final
Duration timeout) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout)))
{
                 throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired
before successfully " +
                         "committing offsets " + offsets);
@@ -1475,6 +1486,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback
callback) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             log.debug("Committing offsets: {}", offsets);
             coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
         } finally {
@@ -1686,6 +1698,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)
{
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
                     Collections.singleton(partition), time.timer(timeout));
             if (offsets == null) {
@@ -2163,7 +2176,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         // coordinator lookup if there are partitions which have missing positions, so
         // a consumer with manually assigned partitions can avoid a coordinator dependence
         // by always ensuring that assigned partitions have an initial position.
-        if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
+        if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer))
return false;
 
         // If there are partitions still needing a position and a reset policy is defined,
         // request reset using the default policy. If no reset strategy is defined and there
@@ -2216,6 +2229,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
     }
 
+    private void maybeThrowInvalidGroupIdException() {
+        if (groupId == null)
+            throw new InvalidGroupIdException("To use the group management or offset commit
APIs, you must " +
+                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer
configuration.");
+    }
+
     // Visible for testing
     String getClientId() {
         return clientId;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d983087..335e0f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -121,7 +121,7 @@ public abstract class AbstractCoordinator implements Closeable {
     private Generation generation = Generation.NO_GENERATION;
 
     private RequestFuture<Void> findCoordinatorFuture = null;
-    
+
     /**
      * Initialize the coordination manager.
      */
@@ -139,7 +139,8 @@ public abstract class AbstractCoordinator implements Closeable {
         this.log = logContext.logger(AbstractCoordinator.class);
         this.client = client;
         this.time = time;
-        this.groupId = groupId;
+        this.groupId = Objects.requireNonNull(groupId,
+                "Expected a non-null group id for coordinator construction");
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.leaveGroupOnClose = leaveGroupOnClose;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 987bad2..3657077 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -138,6 +140,8 @@ public class KafkaConsumerTest {
     // a concurrent heartbeat request
     private final int autoCommitIntervalMs = 500;
 
+    private final String groupId = "mock-group";
+
     @Rule
     public ExpectedException expectedException = ExpectedException.none();
 
@@ -203,7 +207,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testSubscription() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
 
         consumer.subscribe(singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
@@ -226,21 +230,21 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopicCollection() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe((List<String>) null);
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopic() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe(singletonList((String) null));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnEmptyTopic() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             String emptyTopic = "  ";
             consumer.subscribe(singletonList(emptyTopic));
         }
@@ -248,7 +252,7 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullPattern() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe((Pattern) null);
         }
     }
@@ -258,6 +262,7 @@ public class KafkaConsumerTest {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
             consumer.subscribe(singletonList(topic));
         }
@@ -265,7 +270,7 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSeekNegative() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
             consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
             consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
         }
@@ -273,14 +278,14 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
             consumer.assign(null);
         }
     }
 
     @Test
     public void testAssignOnEmptyTopicPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.assign(Collections.<TopicPartition>emptyList());
             assertTrue(consumer.subscription().isEmpty());
             assertTrue(consumer.assignment().isEmpty());
@@ -289,14 +294,14 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicInPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
             consumer.assign(singleton(new TopicPartition(null, 0)));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnEmptyTopicInPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
             consumer.assign(singleton(new TopicPartition("  ", 0)));
         }
     }
@@ -328,7 +333,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testPause() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
 
         consumer.assign(singletonList(tp0));
         assertEquals(singleton(tp0), consumer.assignment());
@@ -346,11 +351,19 @@ public class KafkaConsumerTest {
         consumer.close();
     }
 
-    private KafkaConsumer<byte[], byte[]> newConsumer() {
+    private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
+        return newConsumer(groupId, Optional.empty());
+    }
+
+    private KafkaConsumer<byte[], byte[]> newConsumer(String groupId, Optional<Boolean>
enableAutoCommit) {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        if (groupId != null)
+            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        if (enableAutoCommit.isPresent())
+            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.get().toString());
         return newConsumer(props);
     }
 
@@ -554,7 +567,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
-                OffsetResetStrategy.NONE, true);
+                OffsetResetStrategy.NONE, true, groupId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -577,7 +590,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
-                OffsetResetStrategy.NONE, true);
+                OffsetResetStrategy.NONE, true, groupId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -601,7 +614,7 @@ public class KafkaConsumerTest {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
-                OffsetResetStrategy.LATEST, true);
+                OffsetResetStrategy.LATEST, true, groupId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1210,14 +1223,14 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithNoSubscription() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
             consumer.poll(Duration.ZERO);
         }
     }
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptySubscription() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe(Collections.<String>emptyList());
             consumer.poll(Duration.ZERO);
         }
@@ -1225,7 +1238,7 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptyUserAssignment() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.assign(Collections.<TopicPartition>emptySet());
             consumer.poll(Duration.ZERO);
         }
@@ -1265,13 +1278,78 @@ public class KafkaConsumerTest {
 
     @Test
     public void closeShouldBeIdempotent() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
         consumer.close();
         consumer.close();
         consumer.close();
     }
 
     @Test
+    public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
+        try {
+            newConsumer(null, Optional.of(Boolean.TRUE));
+            fail("Expected an InvalidConfigurationException");
+        } catch (KafkaException e) {
+            assertEquals(InvalidConfigurationException.class, e.getCause().getClass());
+        }
+
+        try {
+            newConsumer((String) null).subscribe(Collections.singleton(topic));
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            newConsumer((String) null).committed(tp0);
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            newConsumer((String) null).commitAsync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            newConsumer((String) null).commitSync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+    }
+
+    @Test
+    public void testOperationsByAssigningConsumerWithDefaultGroupId() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+        consumer.assign(singleton(tp0));
+
+        try {
+            consumer.committed(tp0);
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            consumer.commitAsync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            consumer.commitSync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+    }
+
+    @Test
     public void testMetricConfigRecordingLevel() {
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
@@ -1681,13 +1759,20 @@ public class KafkaConsumerTest {
                                                       Metadata metadata,
                                                       PartitionAssignor assignor,
                                                       boolean autoCommitEnabled) {
-        return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST,
autoCommitEnabled);
+        return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST,
autoCommitEnabled, groupId);
     }
 
     private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
                                                                   KafkaClient client,
                                                                   Metadata metadata) {
-        return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST,
false);
+        return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST,
false, groupId);
+    }
+
+    private KafkaConsumer<String, String> newConsumer(Time time,
+                                                      KafkaClient client,
+                                                      Metadata metadata,
+                                                      String groupId) {
+        return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.LATEST,
true, groupId);
     }
 
     private KafkaConsumer<String, String> newConsumer(Time time,
@@ -1695,9 +1780,9 @@ public class KafkaConsumerTest {
                                                       Metadata metadata,
                                                       PartitionAssignor assignor,
                                                       OffsetResetStrategy resetStrategy,
-                                                      boolean autoCommitEnabled) {
+                                                      boolean autoCommitEnabled,
+                                                      String groupId) {
         String clientId = "mock-consumer";
-        String groupId = "mock-group";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         int requestTimeoutMs = 30000;
@@ -1782,7 +1867,8 @@ public class KafkaConsumerTest {
                 retryBackoffMs,
                 requestTimeoutMs,
                 defaultApiTimeoutMs,
-                assignors);
+                assignors,
+                groupId);
     }
 
     private static class FetchInfo {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 938fe94..61606ab 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -84,6 +84,7 @@ public class ClientAuthenticationFailureTest {
     public void testConsumerWithInvalidCredentials() {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
         StringDeserializer deserializer = new StringDeserializer();
 
         try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props,
deserializer, deserializer)) {
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 0e2797a..5a20005 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -113,10 +113,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness
{
 
   def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
                            valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
-                           configOverrides: Properties = new Properties): KafkaConsumer[K,
V] = {
+                           configOverrides: Properties = new Properties,
+                           configsToRemove: List[String] = List()): KafkaConsumer[K, V] =
{
     val props = new Properties
     props ++= consumerConfig
     props ++= configOverrides
+    configsToRemove.foreach(props.remove(_))
     val consumer = new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer)
     consumers += consumer
     consumer
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c06a796..c11fc12 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.time.Duration
 import java.util
+import java.util.Arrays.asList
 import java.util.regex.Pattern
 import java.util.{Collections, Locale, Optional, Properties}
 
@@ -23,7 +24,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException}
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization._
@@ -1814,4 +1815,129 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         s"The current assignment is ${consumer.assignment()}")
   }
 
+  @Test
+  def testConsumingWithNullGroupId(): Unit = {
+    val topic = "test_topic"
+    val partition = 0;
+    val tp = new TopicPartition(topic, partition)
+    createTopic(topic, 1, 1)
+
+    TestUtils.waitUntilTrue(() => {
+      this.zkClient.topicExists(topic)
+    }, "Failed to create topic")
+
+    val producer = createProducer()
+    producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get()
+    producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get()
+    producer.send(new ProducerRecord(topic, partition, "k3".getBytes, "v3".getBytes)).get()
+    producer.close()
+
+    // consumer 1 uses the default group id and consumes from earliest offset
+    val consumer1Config = new Properties(consumerConfig)
+    consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+    val consumer1 = createConsumer(
+      configOverrides = consumer1Config,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+    // consumer 2 uses the default group id and consumes from latest offset
+    val consumer2Config = new Properties(consumerConfig)
+    consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
+    val consumer2 = createConsumer(
+      configOverrides = consumer2Config,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+    // consumer 3 uses the default group id and starts from an explicit offset
+    val consumer3Config = new Properties(consumerConfig)
+    consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3")
+    val consumer3 = createConsumer(
+      configOverrides = consumer3Config,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+    consumer1.assign(asList(tp))
+    consumer2.assign(asList(tp))
+    consumer3.assign(asList(tp))
+    consumer3.seek(tp, 1)
+
+    val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count()
+
+    try {
+      consumer1.commitSync()
+      fail("Expected offset commit to fail due to null group id")
+    } catch {
+      case e: InvalidGroupIdException => // OK
+    }
+
+    try {
+      consumer2.committed(tp)
+      fail("Expected committed offset fetch to fail due to null group id")
+    } catch {
+      case e: InvalidGroupIdException => // OK
+    }
+
+    val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count()
+    val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count()
+
+    consumer1.unsubscribe()
+    consumer2.unsubscribe()
+    consumer3.unsubscribe()
+
+    consumer1.close()
+    consumer2.close()
+    consumer3.close()
+
+    assertEquals("Expected consumer1 to consume from earliest offset", 3, numRecords1)
+    assertEquals("Expected consumer2 to consume from latest offset", 0, numRecords2)
+    assertEquals("Expected consumer3 to consume from offset 1", 2, numRecords3)
+  }
+
+  @Test
+  def testConsumingWithEmptyGroupId(): Unit = {
+    val topic = "test_topic"
+    val partition = 0;
+    val tp = new TopicPartition(topic, partition)
+    createTopic(topic, 1, 1)
+
+    TestUtils.waitUntilTrue(() => {
+      this.zkClient.topicExists(topic)
+    }, "Failed to create topic")
+
+    val producer = createProducer()
+    producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get()
+    producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get()
+    producer.close()
+
+    // consumer 1 uses the empty group id
+    val consumer1Config = new Properties(consumerConfig)
+    consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+    consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
+    val consumer1 = createConsumer(configOverrides = consumer1Config)
+
+    // consumer 2 uses the empty group id and consumes from latest offset if there is no
committed offset
+    val consumer2Config = new Properties(consumerConfig)
+    consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+    consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
+    consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
+    val consumer2 = createConsumer(configOverrides = consumer2Config)
+
+    consumer1.assign(asList(tp))
+    consumer2.assign(asList(tp))
+
+    val records1 = consumer1.poll(Duration.ofMillis(5000))
+    consumer1.commitSync()
+
+    val records2 = consumer2.poll(Duration.ofMillis(5000))
+    consumer2.commitSync()
+
+    consumer1.close()
+    consumer2.close()
+
+    assertTrue("Expected consumer1 to consume one message from offset 0",
+      records1.count() == 1 && records1.records(tp).asScala.head.offset == 0)
+    assertTrue("Expected consumer2 to consume one message from offset 1, which is the committed
offset of consumer1",
+      records2.count() == 1 && records2.records(tp).asScala.head.offset == 1)
+  }
 }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 33d9964..154547b 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,15 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_2_2_0" href="#upgrade_2_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 2.2.0</a></h4>
+<h5><a id="upgrade_220_notable" href="#upgrade_220_notable">Notable changes in
2.2.0</a></h5>
+<ul>
+    <li>The default consumer group id has been changed from the empty string (<code>""</code>)
to <code>null</code>. Consumers who use the new default group id will not be able
to subscribe to topics,
+        and fetch or commit offsets. The empty string as consumer group id is deprecated
but will be supported until a future major release. Old clients that rely on the empty string
group id will now
+        have to explicitly provide it as part of their consumer config. For more information
see
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer">KIP-289</a>.</li>
+</ul>
+
 <h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
 
 <p><b>Note that 2.1.x contains a change to the internal schema used to store
consumer offsets. Once the upgrade is


Mime
View raw message