This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16ad358 KAFKA-6868; Fix buffer underflow and expose group state in the consumer
groups API (#4980)
16ad358 is described below
commit 16ad358d64a138fc4b455379745ae1550a93d57b
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Mon May 21 08:37:35 2018 -0700
KAFKA-6868; Fix buffer underflow and expose group state in the consumer groups API (#4980)
* The consumer groups API should expose group state and coordinator information. This
information is needed by administrative tools and scripts that access consume groups.
* The partition assignment will be empty when the group is rebalancing. Fix an issue where
the adminclient attempted to deserialize this empty buffer.
* Remove nulls from the API and make all collections immutable.
* DescribeConsumerGroupsResult#all should return a result as expected, rather than Void
* Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being
filled in as "The group id The group id does not exist was not found" and similar.
Reviewers: Attila Sasvari <asasvari@apache.org>, Andras Beni <andrasbeni@cloudera.com>,
Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
---
.../clients/admin/ConsumerGroupDescription.java | 78 +++++++++-----
.../admin/DescribeConsumerGroupsResult.java | 26 ++++-
.../kafka/clients/admin/KafkaAdminClient.java | 51 +++++----
.../kafka/clients/admin/MemberAssignment.java | 13 ++-
.../kafka/clients/admin/MemberDescription.java | 45 ++++----
.../apache/kafka/common/ConsumerGroupState.java | 61 +++++++++++
.../common/errors/GroupIdNotFoundException.java | 12 +--
.../common/errors/GroupNotEmptyException.java | 12 +--
.../main/scala/kafka/tools/StreamsResetter.java | 3 +-
.../kafka/api/AdminClientIntegrationTest.scala | 120 ++++++++++++++++++++-
10 files changed, 315 insertions(+), 106 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 0bfa8a7..bc3857d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -17,55 +17,56 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.utils.Utils;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
/**
* A detailed description of a single consumer group in the cluster.
*/
public class ConsumerGroupDescription {
-
private final String groupId;
private final boolean isSimpleConsumerGroup;
- private final List<MemberDescription> members;
+ private final Collection<MemberDescription> members;
private final String partitionAssignor;
+ private final ConsumerGroupState state;
+ private final Node coordinator;
- /**
- * Creates an instance with the specified parameters.
- *
- * @param groupId The consumer group id
- * @param isSimpleConsumerGroup If Consumer Group is simple
- * @param members The consumer group members
- * @param partitionAssignor The consumer group partition assignor
- */
- public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription>
members, String partitionAssignor) {
- this.groupId = groupId;
+ ConsumerGroupDescription(String groupId,
+ boolean isSimpleConsumerGroup,
+ Collection<MemberDescription> members,
+ String partitionAssignor,
+ ConsumerGroupState state,
+ Node coordinator) {
+ this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
- this.members = members;
- this.partitionAssignor = partitionAssignor;
+ this.members = members == null ? Collections.<MemberDescription>emptyList()
:
+ Collections.unmodifiableList(new ArrayList<>(members));
+ this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
+ this.state = state;
+ this.coordinator = coordinator;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
ConsumerGroupDescription that = (ConsumerGroupDescription) o;
-
- if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
- if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return
false;
- if (members != null ? !members.equals(that.members) : that.members != null) return
false;
- return partitionAssignor != null ? partitionAssignor.equals(that.partitionAssignor)
: that.partitionAssignor == null;
+ return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
+ groupId.equals(that.groupId) &&
+ members.equals(that.members) &&
+ partitionAssignor.equals(that.partitionAssignor) &&
+ state.equals(that.state);
}
@Override
public int hashCode() {
- int result = groupId != null ? groupId.hashCode() : 0;
- result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
- result = 31 * result + (members != null ? members.hashCode() : 0);
- result = 31 * result + (partitionAssignor != null ? partitionAssignor.hashCode()
: 0);
- return result;
+ return Objects.hash(isSimpleConsumerGroup, groupId, members, partitionAssignor, state);
}
/**
@@ -85,7 +86,7 @@ public class ConsumerGroupDescription {
/**
* A list of the members of the consumer group.
*/
- public List<MemberDescription> members() {
+ public Collection<MemberDescription> members() {
return members;
}
@@ -96,9 +97,28 @@ public class ConsumerGroupDescription {
return partitionAssignor;
}
+ /**
+ * The consumer group state, or UNKNOWN if the state is too new for us to parse.
+ */
+ public ConsumerGroupState state() {
+ return state;
+ }
+
+ /**
+ * The consumer group coordinator, or null if the coordinator is not known.
+ */
+ public Node coordinator() {
+ return coordinator;
+ }
+
@Override
public String toString() {
- return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup
+ ", members=" +
- Utils.join(members, ",") + ", partitionAssignor=" + partitionAssignor + ")";
+ return "(groupId=" + groupId +
+ ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+ ", members=" + Utils.join(members, ",") +
+ ", partitionAssignor=" + partitionAssignor +
+ ", state=" + state +
+ ", coordinator=" + coordinator +
+ ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index ac2189c..8f0ebad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -21,7 +21,9 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
/**
@@ -39,16 +41,32 @@ public class DescribeConsumerGroupsResult {
}
/**
- * Return a map from group id to futures which can be used to check the description of
a consumer group.
+ * Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups()
{
return futures;
}
/**
- * Return a future which succeeds only if all the consumer group description succeed.
+ * Return a future which yields all ConsumerGroupDescription objects, if all the describes
succeed.
*/
- public KafkaFuture<Void> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ public KafkaFuture<Map<String, ConsumerGroupDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
+ new KafkaFuture.BaseFunction<Void, Map<String, ConsumerGroupDescription>>()
{
+ @Override
+ public Map<String, ConsumerGroupDescription> apply(Void v) {
+ try {
+ Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(futures.size());
+ for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>>
entry : futures.entrySet()) {
+ descriptions.put(entry.getKey(), entry.getValue().get());
+ }
+ return descriptions;
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, since the KafkaFuture#allOf already
ensured
+ // that all of the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c9e0e18..5f4eefe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
@@ -120,11 +121,9 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -2347,14 +2346,21 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
- final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+ final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse)
abstractResponse;
+ Errors error = fcResponse.error();
+ if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+ // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary.
+ throw error.exception();
+ } else if (error != Errors.NONE) {
+ // All other errors are immediate failures.
+ KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
+ future.completeExceptionally(error.exception());
+ return;
+ }
final long nowDescribeConsumerGroups = time.milliseconds();
-
- final int nodeId = response.node().id();
-
+ final int nodeId = fcResponse.node().id();
runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId))
{
-
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
@@ -2375,24 +2381,29 @@ public class KafkaAdminClient extends AdminClient {
final String protocolType = groupMetadata.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) ||
protocolType.isEmpty()) {
final List<DescribeGroupsResponse.GroupMember>
members = groupMetadata.members();
- final List<MemberDescription> consumers = new ArrayList<>(members.size());
+ final List<MemberDescription> memberDescriptions
= new ArrayList<>(members.size());
for (DescribeGroupsResponse.GroupMember groupMember :
members) {
- final PartitionAssignor.Assignment assignment =
- ConsumerProtocol.deserializeAssignment(
- ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
-
+ Set<TopicPartition> partitions = Collections.emptySet();
+ if (groupMember.memberAssignment().remaining() >
0) {
+ final PartitionAssignor.Assignment assignment
= ConsumerProtocol.
+ deserializeAssignment(groupMember.memberAssignment().duplicate());
+ partitions = new HashSet<>(assignment.partitions());
+ }
final MemberDescription memberDescription =
- new MemberDescription(
- groupMember.memberId(),
- groupMember.clientId(),
- groupMember.clientHost(),
- new MemberAssignment(assignment.partitions()));
- consumers.add(memberDescription);
+ new MemberDescription(groupMember.memberId(),
+ groupMember.clientId(),
+ groupMember.clientHost(),
+ new MemberAssignment(partitions));
+ memberDescriptions.add(memberDescription);
}
- final String protocol = groupMetadata.protocol();
final ConsumerGroupDescription consumerGroupDescription
=
- new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
consumers, protocol);
+ new ConsumerGroupDescription(groupId,
+ protocolType.isEmpty(),
+ memberDescriptions,
+ groupMetadata.protocol(),
+ ConsumerGroupState.parse(groupMetadata.state()),
+ fcResponse.node());
future.complete(consumerGroupDescription);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
index bd95813..6c180ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -19,21 +19,24 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
-import java.util.List;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* A description of the assignments of a specific group member.
*/
public class MemberAssignment {
- private final List<TopicPartition> topicPartitions;
+ private final Set<TopicPartition> topicPartitions;
/**
* Creates an instance with the specified parameters.
*
* @param topicPartitions List of topic partitions
*/
- public MemberAssignment(List<TopicPartition> topicPartitions) {
- this.topicPartitions = topicPartitions;
+ MemberAssignment(Set<TopicPartition> topicPartitions) {
+ this.topicPartitions = topicPartitions == null ? Collections.<TopicPartition>emptySet()
:
+ Collections.unmodifiableSet(new HashSet<>(topicPartitions));
}
@Override
@@ -54,7 +57,7 @@ public class MemberAssignment {
/**
* The topic partitions assigned to a group member.
*/
- public List<TopicPartition> topicPartitions() {
+ public Set<TopicPartition> topicPartitions() {
return topicPartitions;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 2ba1963..895abad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -17,49 +17,42 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+
/**
* A detailed description of a single group instance in the cluster.
*/
public class MemberDescription {
-
private final String memberId;
private final String clientId;
private final String host;
private final MemberAssignment assignment;
- /**
- * Creates an instance with the specified parameters.
- *
- * @param memberId The consumer id
- * @param clientId The client id
- * @param host The host
- * @param assignment The assignment
- */
- public MemberDescription(String memberId, String clientId, String host, MemberAssignment
assignment) {
- this.memberId = memberId;
- this.clientId = clientId;
- this.host = host;
- this.assignment = assignment;
+ MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment)
{
+ this.memberId = memberId == null ? "" : memberId;
+ this.clientId = clientId == null ? "" : clientId;
+ this.host = host == null ? "" : host;
+ this.assignment = assignment == null ?
+ new MemberAssignment(Collections.<TopicPartition>emptySet()) : assignment;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
MemberDescription that = (MemberDescription) o;
-
- if (memberId != null ? !memberId.equals(that.memberId) : that.memberId != null) return
false;
- if (clientId != null ? !clientId.equals(that.clientId) : that.clientId != null) return
false;
- return assignment != null ? assignment.equals(that.assignment) : that.assignment
== null;
+ return memberId.equals(that.memberId) &&
+ clientId.equals(that.clientId) &&
+ host.equals(that.host) &&
+ assignment.equals(that.assignment);
}
@Override
public int hashCode() {
- int result = memberId != null ? memberId.hashCode() : 0;
- result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
- result = 31 * result + (assignment != null ? assignment.hashCode() : 0);
- return result;
+ return Objects.hash(memberId, clientId, host, assignment);
}
/**
@@ -92,7 +85,9 @@ public class MemberDescription {
@Override
public String toString() {
- return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" + host + ",
assignment=" +
- assignment + ")";
+ return "(memberId=" + memberId +
+ ", clientId=" + clientId +
+ ", host=" + host +
+ ", assignment=" + assignment + ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
new file mode 100644
index 0000000..7f3d4f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.HashMap;
+
+/**
+ * The consumer group state.
+ */
+public enum ConsumerGroupState {
+ UNKNOWN("Unknown"),
+ PREPARING_REBALANCE("PreparingRebalance"),
+ COMPLETING_REBALANCE("CompletingRebalance"),
+ STABLE("Stable"),
+ DEAD("Dead"),
+ EMPTY("Empty");
+
+ private final static HashMap<String, ConsumerGroupState> NAME_TO_ENUM;
+
+ static {
+ NAME_TO_ENUM = new HashMap<>();
+ for (ConsumerGroupState state : ConsumerGroupState.values()) {
+ NAME_TO_ENUM.put(state.name, state);
+ }
+ }
+
+ private final String name;
+
+ ConsumerGroupState(String name) {
+ this.name = name;
+ }
+
+
+ /**
+ * Parse a string into a consumer group state.
+ */
+ public static ConsumerGroupState parse(String name) {
+ ConsumerGroupState state = NAME_TO_ENUM.get(name);
+ return state == null ? UNKNOWN : state;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
index 1ff30f1..a4d509d 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
@@ -17,15 +17,7 @@
package org.apache.kafka.common.errors;
public class GroupIdNotFoundException extends ApiException {
- private final String groupId;
-
- public GroupIdNotFoundException(String groupId) {
- super("The group id " + groupId + " was not found");
- this.groupId = groupId;
- }
-
- public String groupId() {
- return groupId;
+ public GroupIdNotFoundException(String message) {
+ super(message);
}
-
}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
index 264e613..e15b3e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
@@ -17,15 +17,7 @@
package org.apache.kafka.common.errors;
public class GroupNotEmptyException extends ApiException {
- private final String groupId;
-
- public GroupNotEmptyException(String groupId) {
- super("The group " + groupId + " is not empty");
- this.groupId = groupId;
- }
-
- public String groupId() {
- return groupId;
+ public GroupNotEmptyException(String message) {
+ super(message);
}
-
}
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index d7c4e43..3c045c6 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -157,7 +157,8 @@ public class StreamsResetter {
final AdminClient adminClient) throws ExecutionException,
InterruptedException {
final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId),
(new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
- final List<MemberDescription> members = describeResult.describedGroups().get(groupId).get().members();
+ final List<MemberDescription> members =
+ new ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active
"
+ "and has following members: " + members + ". "
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index b31c09d..e7dd108 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -31,10 +31,10 @@ import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, TopicPartition, TopicPartitionReplica}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
import java.lang.{Long => JLong}
import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.internals.Topic
import org.scalatest.Assertions.intercept
import scala.concurrent.duration.Duration
@@ -98,6 +99,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging
{
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+ config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
// We set this in order to test that we don't expose sensitive data via describe configs.
This will already be
// set for subclasses with security enabled and we don't want to overwrite it.
if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
@@ -959,6 +961,120 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
client.close()
assertEquals(1, factory.failuresInjected)
}
+
+ /**
+ * Test the consumer group APIs.
+ */
+ @Test
+ def testConsumerGroups(): Unit = {
+ val config = createConfig()
+ val client = AdminClient.create(config)
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups()
+ assertTrue(0 == list1.all().get().size())
+ assertTrue(0 == list1.errors().get().size())
+ assertTrue(0 == list1.valid().get().size())
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+ client.createTopics(Collections.singleton(
+ new NewTopic(testTopicName, testNumPartitions, 1))).all().get()
+ val producer = createNewProducer
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+ val fakeGroupId = "fake_group_id"
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
+ val consumer = TestUtils.createNewConsumer(brokerList,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.clientSaslProperties,
+ props = Some(newConsumerConfig))
+ try {
+ // Start a consumer in a thread that will subscribe to a new group.
+ val consumerThread = new Thread {
+ override def run {
+ consumer.subscribe(Collections.singleton(testTopicName))
+ while (true) {
+ consumer.poll(5000)
+ consumer.commitSync()
+ }
+ }
+ }
+ try {
+ consumerThread.start
+ // Test that we can list the new group.
+ TestUtils.waitUntilTrue(() => {
+ val matching = client.listConsumerGroups().all().get().asScala.
+ filter(listing => listing.groupId().equals(testGroupId))
+ !matching.isEmpty
+ }, s"Expected to be able to list $testGroupId")
+
+ val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
+ assertEquals(2, result.describedGroups().size())
+
+ // Test that we can get information about the test consumer group.
+ assertTrue(result.describedGroups().containsKey(testGroupId))
+ val testGroupDescription = result.describedGroups().get(testGroupId).get()
+ assertEquals(testGroupId, testGroupDescription.groupId())
+ assertFalse(testGroupDescription.isSimpleConsumerGroup())
+ assertEquals(1, testGroupDescription.members().size())
+ val member = testGroupDescription.members().iterator().next()
+ assertEquals(testClientId, member.clientId())
+ val topicPartitions = member.assignment().topicPartitions()
+ assertEquals(testNumPartitions, topicPartitions.size())
+ assertEquals(testNumPartitions, topicPartitions.asScala.
+ count(tp => tp.topic().equals(testTopicName)))
+
+ // Test that the fake group is listed as dead.
+ assertTrue(result.describedGroups().containsKey(fakeGroupId))
+ val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get()
+ assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+ assertEquals(0, fakeGroupDescription.members().size())
+ assertEquals("", fakeGroupDescription.partitionAssignor())
+ assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
+
+ // Test that all() returns 2 results
+ assertEquals(2, result.all().get().size())
+
+ // Test listConsumerGroupOffsets
+ val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+ TestUtils.waitUntilTrue(() => {
+ val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+ val part = new TopicPartition(testTopicName, 0)
+ parts.containsKey(part) && (parts.get(part).offset() == 1)
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ // Test consumer group deletion
+ val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
+ assertEquals(2, deleteResult.deletedGroups().size())
+
+ // Deleting the fake group ID should get GroupIdNotFoundException.
+ assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
+ assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+ classOf[GroupIdNotFoundException])
+
+ // Deleting the real group ID should get GroupNotEmptyException
+ assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
+ assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+ classOf[GroupNotEmptyException])
+ } finally {
+ consumerThread.interrupt()
+ consumerThread.join()
+ }
+ } finally {
+ Utils.closeQuietly(consumer, "consumer")
+ }
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
}
object AdminClientIntegrationTest {
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.
|