kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId (#4851)
Date Wed, 11 Apr 2018 23:47:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7421f9d  KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId
(#4851)
7421f9d is described below

commit 7421f9dce2d2e763f70602b13df1efcd32f691b9
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Apr 11 16:47:11 2018 -0700

    KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId (#4851)
    
    We had a regression in #4788 which caused the offset commit/fetch/describe APIs to fail
if the groupId was empty. This should be allowed for backwards compatibility. Additionally,
I have modified DeleteGroups to allow removal of the empty group, which was missed in the
initial implementation. I've added a test case to ensure that we do not miss this again in
the future.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  43 +++++----
 .../kafka/admin/DeleteConsumerGroupsTest.scala     |  26 ------
 .../coordinator/group/GroupCoordinatorTest.scala   | 100 ++++++++++++++-------
 3 files changed, 96 insertions(+), 73 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 225b709..cbbd913 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,7 +27,7 @@ import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
@@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback): Unit = {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
       responseCallback(joinError(memberId, error))
       return
     }
@@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback): Unit = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
       case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
         // The coordinator is loading, which means we've lost the state of the active rebalance
and the
         // group will need to start over at JoinGroup. By returning rebalance in progress,
the consumer
@@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors =>
Unit): Unit = {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
       responseCallback(error)
       return
     }
@@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int,
     var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
 
     groupIds.foreach { groupId =>
-      validateGroup(groupId) match {
+      validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
         case Some(error) =>
           groupErrors += groupId -> error
 
@@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       generationId: Int,
                       responseCallback: Errors => Unit) {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
       if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
         // the group is still loading, so respond just blindly
         responseCallback(Errors.NONE)
@@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int,
                              producerEpoch: Short,
                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, Errors] =>
Unit): Unit = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
       case None =>
         val group = groupManager.getGroup(groupId).getOrElse {
@@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Errors] => Unit)
{
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
       case None =>
         groupManager.getGroup(groupId) match {
@@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None):
   (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
 
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
       case Some(error) => error -> Map.empty
       case None =>
         // return offsets blindly regardless the current group state since the group may
be using
@@ -543,7 +543,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match {
       case Some(error) => (error, GroupCoordinator.EmptyGroup)
       case None =>
         groupManager.getGroup(groupId) match {
@@ -563,8 +563,23 @@ class GroupCoordinator(val brokerId: Int,
     info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(",
")}.")
   }
 
-  private def validateGroup(groupId: String): Option[Errors] = {
-    if (!validGroupId(groupId))
+  private def isValidGroupId(groupId: String, api: ApiKeys): Boolean = {
+    api match {
+      case ApiKeys.OFFSET_COMMIT | ApiKeys.OFFSET_FETCH | ApiKeys.DESCRIBE_GROUPS | ApiKeys.DELETE_GROUPS
=>
+        // For backwards compatibility, we support the offset commit APIs for the empty groupId,
and also
+        // in DescribeGroups and DeleteGroups so that users can view and delete state of
all groups.
+        groupId != null
+      case _ =>
+        // The remaining APIs are groups using Kafka for group coordination and must have
a non-empty groupId
+        groupId != null && !groupId.isEmpty
+    }
+  }
+
+  /**
+   * Check that the groupId is valid, assigned to this coordinator and that the group has
been loaded.
+   */
+  private def validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors] = {
+    if (!isValidGroupId(groupId, api))
       Some(Errors.INVALID_GROUP_ID)
     else if (!isActive.get)
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
@@ -648,10 +663,6 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  private def validGroupId(groupId: String): Boolean = {
-    groupId != null && !groupId.isEmpty
-  }
-
   private def joinError(memberId: String, error: Errors): JoinGroupResult = {
     JoinGroupResult(
       members = Map.empty,
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index effa55d..4cc2837 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -60,32 +60,6 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
   }
 
   @Test
-  def testDeleteCmdInvalidGroupId() {
-    TestUtils.createOffsetsTopic(zkClient, servers)
-    val invalidGroupId = ""
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
-    val service = getConsumerGroupService(cgcArgs)
-
-    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
-    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting
consumer group",
-      output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}"))
-  }
-
-  @Test
-  def testDeleteInvalidGroupId() {
-    TestUtils.createOffsetsTopic(zkClient, servers)
-    val invalidGroupId = ""
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
-    val service = getConsumerGroupService(cgcArgs)
-
-    val result = service.deleteGroups()
-    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting
consumer group",
-      result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
-  }
-
-  @Test
   def testDeleteCmdNonEmptyGroup() {
     TestUtils.createOffsetsTopic(zkClient, servers)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 08c13eb..933e91b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -37,7 +37,7 @@ import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
-import scala.collection._
+import scala.collection.mutable
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
@@ -138,7 +138,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val topicPartition = new TopicPartition("foo", 0)
     var offsetCommitErrors = Map.empty[TopicPartition, Errors]
     groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
-      immutable.Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors
= result })
+      Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors
= result })
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
 
     // Heartbeat
@@ -155,7 +155,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
 
     // DeleteGroups
-    val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(immutable.Set(otherGroupId))
+    val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId))
 
     // Check that non-loading groups are still accessible
@@ -452,7 +452,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     timer.advanceClock(sessionTimeout / 2)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp
-> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp
-> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     timer.advanceClock(sessionTimeout / 2 + 100)
@@ -820,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
-    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp
-> offset))
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp ->
offset))
     assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
   }
 
@@ -830,7 +830,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
   }
 
@@ -856,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -870,7 +870,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -879,13 +879,51 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testCommitAndFetchOffsetsWithEmptyGroup() {
+    // For backwards compatibility, the coordinator supports committing/fetching offsets
with an empty groupId.
+    // To allow inspection and removal of the empty group, we must also support DescribeGroups
and DeleteGroups
+
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val groupId = ""
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (fetchError, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, fetchError)
+    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+
+    val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, describeError)
+    assertEquals(Empty.toString, summary.state)
+
+    val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val partition = EasyMock.niceMock(classOf[Partition])
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.replay(replicaManager, partition)
+
+    val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId))
+    assertEquals(Errors.NONE, deleteErrors(groupId))
+
+    val (err, data) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, err)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), data.get(tp).map(_.offset))
+  }
+
+  @Test
   def testBasicFetchTxnOffsets() {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch,
immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch,
Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -912,7 +950,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch,
immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch,
Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -936,7 +974,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch,
immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch,
Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -975,16 +1013,16 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
     val errors = mutable.ArrayBuffer[Errors]()
-    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+    val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
 
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // Ensure that the two groups map to different partitions.
     assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
 
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch,
immutable.Map(partitions(0) -> offsets(0))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch,
Map(partitions(0) -> offsets(0))))
     assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
-    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch,
immutable.Map(partitions(1) -> offsets(1))))
+    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch,
Map(partitions(1) -> offsets(1))))
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // We got a commit for only one __consumer_offsets partition. We should only materialize
it's group offsets.
@@ -1051,16 +1089,16 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
     val errors = mutable.ArrayBuffer[Errors]()
-    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+    val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
 
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // producer0 commits the offsets for partition0
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0),
immutable.Map(partitions(0) -> offsets(0))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0),
Map(partitions(0) -> offsets(0))))
     assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
 
     // producer1 commits the offsets for partition1
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1),
immutable.Map(partitions(1) -> offsets(1))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1),
Map(partitions(1) -> offsets(1))))
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // producer0 commits its transaction.
@@ -1123,7 +1161,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 ->
offset2, tp3 -> offset3))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 -> offset2,
tp3 -> offset3))
     assertEquals(Errors.NONE, commitOffsetResult(tp1))
     assertEquals(Errors.NONE, commitOffsetResult(tp2))
     assertEquals(Errors.NONE, commitOffsetResult(tp3))
@@ -1150,7 +1188,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp
-> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp
-> offset))
     assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
   }
 
@@ -1332,20 +1370,20 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     joinGroup(groupId, memberId, protocolType, protocols)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
   }
 
   @Test
   def testDeleteGroupWithInvalidGroupId() {
-    val invalidGroupId = ""
-    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
+    val invalidGroupId = null
+    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId))
     assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
   }
 
   @Test
   def testDeleteGroupWithWrongCoordinator() {
-    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
     assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
   }
 
@@ -1367,7 +1405,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
   }
 
@@ -1388,7 +1426,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId,
immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId,
Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
@@ -1408,7 +1446,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
 
     assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
@@ -1535,7 +1573,7 @@ class GroupCoordinatorTest extends JUnitSuite {
                                   assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams]
= {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse]
=> Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
@@ -1616,10 +1654,10 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def commitOffsets(groupId: String,
                             consumerId: String,
                             generationId: Int,
-                            offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams
= {
+                            offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams
= {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse]
=> Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
@@ -1646,10 +1684,10 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def commitTransactionalOffsets(groupId: String,
                                          producerId: Long,
                                          producerEpoch: Short,
-                                         offsets: immutable.Map[TopicPartition, OffsetAndMetadata]):
CommitOffsetCallbackParams = {
+                                         offsets: Map[TopicPartition, OffsetAndMetadata]):
CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse]
=> Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message