kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6884; Consumer group command should use new admin client (#5032)
Date Tue, 17 Jul 2018 18:51:10 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8ec8ec5  KAFKA-6884; Consumer group command should use new admin client (#5032)
8ec8ec5 is described below

commit 8ec8ec542264604cc44ef5a17439573c0de0990a
Author: Attila Sasvari <asasvari@apache.org>
AuthorDate: Tue Jul 17 19:51:05 2018 +0100

    KAFKA-6884; Consumer group command should use new admin client (#5032)
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>,
Jason Gustafson <jason@confluent.io>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 239 +++++++++++++--------
 .../SaslClientsWithInvalidCredentialsTest.scala    |  40 ++--
 .../kafka/admin/ConsumerGroupCommandTest.scala     |   3 +-
 .../kafka/admin/DeleteConsumerGroupsTest.scala     |  43 ++--
 .../kafka/admin/DescribeConsumerGroupTest.scala    |  56 ++---
 .../unit/kafka/admin/ListConsumerGroupTest.scala   |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  33 ++-
 7 files changed, 254 insertions(+), 162 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 435c2fd..48c2cff 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,23 +18,25 @@
 package kafka.admin
 
 import java.text.{ParseException, SimpleDateFormat}
+import java.util
 import java.util.{Date, Properties}
+
 import javax.xml.datatype.DatatypeFactory
 import joptsimple.{OptionParser, OptionSpec}
-
-
 import kafka.utils._
-import kafka.utils.Implicits._
-
-import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.{CommonClientConfigs, admin}
+import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, Set}
+import scala.concurrent.ExecutionException
+import scala.util.{Failure, Success, Try}
 
 object ConsumerGroupCommand extends Logging {
 
@@ -126,7 +128,11 @@ object ConsumerGroupCommand extends Logging {
     private var consumer: KafkaConsumer[String, String] = _
 
     def listGroups(): List[String] = {
-      adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
+      val result = adminClient.listConsumerGroups(
+        withTimeoutMs(new ListConsumerGroupsOptions))
+
+        val listings = result.all.get.asScala
+        listings.map(_.groupId).toList
     }
 
     private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]):
Boolean = {
@@ -297,57 +303,100 @@ object ConsumerGroupCommand extends Logging {
       }.toArray
     }
 
-    private[admin] def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]])
= {
-      val group = opts.options.valueOf(opts.groupOpt)
-      val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
-      val assignments = consumerGroupSummary.consumers.map { consumers =>
-        var assignedTopicPartitions = Array[TopicPartition]()
-        val offsets = adminClient.listGroupOffsets(group)
-        val rowsWithConsumer =
-          if (offsets.isEmpty)
-            List[PartitionAssignmentState]()
-          else {
-            consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > _.assignment.size).flatMap
{ consumerSummary =>
-              val topicPartitions = consumerSummary.assignment
-              assignedTopicPartitions = assignedTopicPartitions ++ consumerSummary.assignment
-              val partitionOffsets: Map[TopicPartition, Option[Long]] = consumerSummary.assignment.map
{ topicPartition =>
-                new TopicPartition(topicPartition.topic, topicPartition.partition) ->
offsets.get(topicPartition)
-              }.toMap
-              collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions,
-                partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
-                Some(s"${consumerSummary.clientId}"))
-            }
-          }
+    def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = {
+      val groupId = opts.options.valueOf(opts.groupOpt)
+      val consumerGroups = adminClient.describeConsumerGroups(
+        util.Arrays.asList(groupId),
+        withTimeoutMs(new DescribeConsumerGroupsOptions)
+      ).describedGroups()
+
+      val group = consumerGroups.get(groupId).get
+      group.state.toString match {
+        case "Empty" | "Dead" =>
+          val partitionsToReset = getPartitionsToReset(groupId)
+          val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
+
+          // Dry-run is the default behavior if --execute is not specified
+          val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
+          if (!dryRun)
+            getConsumer.commitSync(preparedOffsets.asJava)
+          preparedOffsets
+        case currentState =>
+          printError(s"Assignments can only be reset if the group '$groupId' is inactive,
but the current state is $currentState.")
+          Map.empty
+      }
+    }
 
-        val rowsWithoutConsumer = offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap
{
-          case (topicPartition, offset) =>
-            collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicPartition),
-              Map(topicPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
-              Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+    /**
+      * Returns the state of the specified consumer group and partition assignment states
+      */
+    def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) =
{
+      val groupId = opts.options.valueOf(opts.groupOpt)
+      val consumerGroup = adminClient.describeConsumerGroups(
+        List(groupId).asJava,
+        withTimeoutMs(new DescribeConsumerGroupsOptions())
+      ).describedGroups.get(groupId).get
+
+      val state = consumerGroup.state
+      val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
+      var assignedTopicPartitions = ListBuffer[TopicPartition]()
+      val rowsWithConsumer = if (committedOffsets.isEmpty) List[PartitionAssignmentState]()
else consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
+        .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size)
+        .flatMap { consumerSummary =>
+          val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
+          assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
+          val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
+            .map { topicPartition =>
+              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
+            }.toMap
+
+        collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
+          partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
+          Some(s"${consumerSummary.clientId}"))
         }
 
-        rowsWithConsumer ++ rowsWithoutConsumer
+      val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap
{
+        case (topicPartition, offset) =>
+          collectConsumerAssignment(
+            groupId,
+            Option(consumerGroup.coordinator),
+            Seq(topicPartition),
+            Map(topicPartition -> Some(offset.offset)),
+                                  Some(MISSING_COLUMN_VALUE),
+                                  Some(MISSING_COLUMN_VALUE),
+                                  Some(MISSING_COLUMN_VALUE))
       }
 
-      (Some(consumerGroupSummary.state), assignments)
+      (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
     }
 
     private[admin] def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]])
= {
-      val group = opts.options.valueOf(opts.groupOpt)
-      val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
-      (Some(consumerGroupSummary.state),
-        consumerGroupSummary.consumers.map(_.map {
-          consumer => MemberAssignmentState(group, consumer.consumerId, consumer.host,
consumer.clientId, consumer.assignment.length,
-            if (verbose) consumer.assignment else List())
-        })
-      )
+      val groupId = opts.options.valueOf(opts.groupOpt)
+      val consumerGroups = adminClient.describeConsumerGroups(
+        List(groupId).asJava,
+        withTimeoutMs(new DescribeConsumerGroupsOptions)
+      ).describedGroups()
+
+      val group = consumerGroups.get(groupId).get
+      val state = group.state
+
+      (Some(state.toString),
+        Option(group.members().asScala.map {
+          consumer => MemberAssignmentState(groupId, consumer.consumerId, consumer.host,
consumer.clientId, consumer.assignment.topicPartitions.size(),
+            if (verbose) consumer.assignment.topicPartitions.asScala.toList else List())
+        }.toList))
     }
 
     private[admin] def collectGroupState(): GroupState = {
-      val group = opts.options.valueOf(opts.groupOpt)
-      val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
-      GroupState(group, consumerGroupSummary.coordinator, consumerGroupSummary.assignmentStrategy,
-        consumerGroupSummary.state, consumerGroupSummary.consumers.get.size)
+      val groupId = opts.options.valueOf(opts.groupOpt)
+      val consumerGroups = adminClient.describeConsumerGroups(
+        util.Arrays.asList(groupId),
+        withTimeoutMs(new DescribeConsumerGroupsOptions)
+      ).describedGroups()
+
+      val group = consumerGroups.get(groupId).get
+      GroupState(groupId, group.coordinator, group.partitionAssignor(),
+        group.state.toString, group.members().size)
     }
 
     private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
@@ -385,19 +434,19 @@ object ConsumerGroupCommand extends Logging {
       if (consumer != null) consumer.close()
     }
 
-    private def createAdminClient(): AdminClient = {
+    private def createAdminClient(): admin.AdminClient = {
       val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else new Properties()
       props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
-      AdminClient.create(props)
+      admin.AdminClient.create(props)
     }
 
-    private def getConsumer: KafkaConsumer[String, String] = {
+    private def getConsumer = {
       if (consumer == null)
-        consumer = createConsumer
+        consumer = createConsumer()
       consumer
     }
 
-    private def createConsumer: KafkaConsumer[String, String] = {
+    private def createConsumer(): KafkaConsumer[String, String] = {
       val properties = new Properties()
       val deserializer = (new StringDeserializer).getClass.getName
       val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
@@ -407,29 +456,19 @@ object ConsumerGroupCommand extends Logging {
       properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
       properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
       properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
-      if (opts.options.has(opts.commandConfigOpt))
-        properties ++= Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+
+      if (opts.options.has(opts.commandConfigOpt)) {
+        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)).asScala.foreach {
+          case (k,v) => properties.put(k, v)
+        }
+      }
 
       new KafkaConsumer(properties)
     }
 
-    def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = {
-      val groupId = opts.options.valueOf(opts.groupOpt)
-      val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt))
-      consumerGroupSummary.state match {
-        case "Empty" | "Dead" =>
-          val partitionsToReset = getPartitionsToReset(groupId)
-          val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
-
-          // Dry-run is the default behavior if --execute is not specified
-          val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
-          if (!dryRun)
-            getConsumer.commitSync(preparedOffsets.asJava)
-          preparedOffsets
-        case currentState =>
-          printError(s"Assignments can only be reset if the group '$groupId' is inactive,
but the current state is $currentState.")
-          Map.empty
-      }
+    private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) =  {
+      val t = opts.options.valueOf(opts.timeoutMsOpt).intValue()
+      options.timeoutMs(t)
     }
 
     private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Seq[TopicPartition]
= topicArgs.flatMap {
@@ -443,7 +482,7 @@ object ConsumerGroupCommand extends Logging {
 
     private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
       if (opts.options.has(opts.allTopicsOpt)) {
-        val allTopicPartitions = adminClient.listGroupOffsets(groupId).keys.toSeq
+        val allTopicPartitions = getCommittedOffsets(groupId).keySet().asScala.toSeq
         allTopicPartitions
       } else if (opts.options.has(opts.topicOpt)) {
         val topics = opts.options.valuesOf(opts.topicOpt).asScala
@@ -456,6 +495,13 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
+    private def getCommittedOffsets(groupId: String) = {
+      adminClient.listConsumerGroupOffsets(
+        groupId,
+        withTimeoutMs(new ListConsumerGroupOffsetsOptions)
+      ).partitionsToOffsetAndMetadata.get
+    }
+
     private def parseResetPlan(resetPlanCsv: String): Map[TopicPartition, OffsetAndMetadata]
= {
       resetPlanCsv.split("\n")
         .map { line =>
@@ -489,11 +535,11 @@ object ConsumerGroupCommand extends Logging {
           }
         }.toMap
       } else if (opts.options.has(opts.resetShiftByOpt)) {
-        val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
+        val currentCommittedOffsets = getCommittedOffsets(groupId)
         val requestedOffsets = partitionsToReset.map { topicPartition =>
           val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
-          val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
-            throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition
since there is no current committed offset"))
+          val currentOffset = currentCommittedOffsets.asScala.getOrElse(topicPartition,
+            throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition
since there is no current committed offset")).offset
           (topicPartition, currentOffset + shiftBy)
         }.toMap
         checkOffsetsRange(requestedOffsets).map {
@@ -528,19 +574,19 @@ object ConsumerGroupCommand extends Logging {
         val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
         val resetPlan = parseResetPlan(resetPlanCsv)
         val requestedOffsets = resetPlan.keySet.map { topicPartition =>
-          (topicPartition, resetPlan(topicPartition).offset())
+          (topicPartition, resetPlan(topicPartition).offset)
         }.toMap
         checkOffsetsRange(requestedOffsets).map {
           case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
         }
       } else if (opts.options.has(opts.resetToCurrentOpt)) {
-        val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
+        val currentCommittedOffsets = getCommittedOffsets(groupId)
         val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset)
=
           partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
 
         val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map
{ topicPartition =>
           (topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition)
match {
-            case Some(offset) => offset
+            case offset if offset != null => offset.offset
             case _ => throw new IllegalStateException(s"Expected a valid current offset
for topic partition: $topicPartition")
           }))
         }.toMap
@@ -584,26 +630,37 @@ object ConsumerGroupCommand extends Logging {
       rows.foldRight("")(_ + "\n" + _)
     }
 
-    def deleteGroups(): Map[String, Errors] = {
+    def deleteGroups(): Map[String, Throwable] = {
       val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList
-      val result = adminClient.deleteConsumerGroups(groupsToDelete)
-      val successfullyDeleted = result.filter {
-        case (_, error) => error == Errors.NONE
-      }.keySet
+      val deletedGroups = adminClient.deleteConsumerGroups(
+        groupsToDelete.asJava,
+        withTimeoutMs(new DeleteConsumerGroupsOptions)
+      ).deletedGroups().asScala
+
+      val result = deletedGroups.mapValues { f =>
+        Try(f.get) match {
+          case _: Success[_] => null
+          case Failure(e) => e
+        }
+      }
 
-      if (successfullyDeleted.size == result.size)
-        println(s"Deletion of requested consumer groups (${successfullyDeleted.mkString("'",
"', '", "'")}) was successful.")
+      val (success, failed) = result.partition {
+        case (_, error) => error == null
+      }
+
+      if (failed.isEmpty) {
+        println(s"Deletion of requested consumer groups (${success.keySet.mkString("'", "',
'", "'")}) was successful.")
+      }
       else {
         printError("Deletion of some consumer groups failed:")
-        result.foreach {
-          case (group, error) if error != Errors.NONE => println(s"* Group '$group' could
not be deleted due to: ${error.toString}")
-          case _ => // no need to print successful deletions individually
+        failed.foreach {
+          case (group, error) => println(s"* Group '$group' could not be deleted due to:
${error.toString}")
         }
-        if (successfullyDeleted.nonEmpty)
-          println(s"\nThese consumer groups were deleted successfully: ${successfullyDeleted.mkString("'",
"', '", "'")}")
+        if (success.nonEmpty)
+          println(s"\nThese consumer groups were deleted successfully: ${success.keySet.mkString("'",
"', '", "'")}")
       }
 
-      result
+      result.toMap
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 3c2b353..b58ba74 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -13,6 +13,7 @@
 package kafka.api
 
 import java.nio.file.Files
+import java.time.Duration
 import java.util.Collections
 import java.util.concurrent.{ExecutionException, TimeUnit}
 
@@ -120,12 +121,12 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
   }
 
   private def verifyConsumerWithAuthenticationFailure(consumer: KafkaConsumer[Array[Byte],
Array[Byte]]) {
-    verifyAuthenticationException(consumer.poll(10000))
+    verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000)))
     verifyAuthenticationException(consumer.partitionsFor(topic))
 
     createClientCredential()
     verifyWithRetry(sendOneRecord())
-    verifyWithRetry(assertEquals(1, consumer.poll(1000).count))
+    verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count))
   }
 
   @Test
@@ -158,6 +159,29 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
 
   @Test
   def testConsumerGroupServiceWithAuthenticationFailure() {
+    val consumerGroupService: ConsumerGroupService = prepareConsumerGroupService
+
+    val consumer = consumers.head
+    consumer.subscribe(List(topic).asJava)
+
+    verifyAuthenticationException(consumerGroupService.listGroups)
+    consumerGroupService.close()
+  }
+
+  @Test
+  def testConsumerGroupServiceWithAuthenticationSuccess() {
+    createClientCredential()
+    val consumerGroupService: ConsumerGroupService = prepareConsumerGroupService
+
+    val consumer = consumers.head
+    consumer.subscribe(List(topic).asJava)
+
+    verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))
+    assertEquals(1, consumerGroupService.listGroups.size)
+    consumerGroupService.close()
+  }
+
+  private def prepareConsumerGroupService = {
     val propsFile = TestUtils.tempFile()
     val propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("security.protocol=SASL_PLAINTEXT\n".getBytes())
@@ -170,14 +194,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
                         "--command-config", propsFile.getAbsolutePath)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupService = new ConsumerGroupService(opts)
-
-    val consumer = consumers.head
-    consumer.subscribe(List(topic).asJava)
-
-    verifyAuthenticationException(consumerGroupService.listGroups)
-    createClientCredential()
-    verifyWithRetry(consumer.poll(1000))
-    assertEquals(1, consumerGroupService.listGroups.size)
+    consumerGroupService
   }
 
   private def createClientCredential(): Unit = {
@@ -203,11 +220,10 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness
with
       action
       fail("Expected an authentication exception")
     } catch {
-      case e: SaslAuthenticationException =>
+      case e : Exception =>
         // expected exception
         val elapsedMs = System.currentTimeMillis - startMs
         assertTrue(s"Poll took too long, elapsed=$elapsedMs", elapsedMs <= 5000)
-        assertTrue(s"Exception message not useful: $e", e.getMessage.contains("invalid credentials"))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 2bed673..cf00e93 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.admin
 
+import java.time.Duration
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.util.{Collections, Properties}
 
@@ -131,7 +132,7 @@ object ConsumerGroupCommandTest {
       try {
         subscribe()
         while (true)
-          consumer.poll(Long.MaxValue)
+          consumer.poll(Duration.ofMillis(Long.MaxValue))
       } catch {
         case _: WakeupException => // OK
       } finally {
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index aae48d1..f24d8d1 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -18,6 +18,7 @@ package kafka.admin
 
 import joptsimple.OptionException
 import kafka.utils.TestUtils
+import org.apache.kafka.common.errors.{GroupIdNotFoundException, GroupNotEmptyException}
 import org.apache.kafka.common.protocol.Errors
 import org.junit.Assert._
 import org.junit.Test
@@ -41,7 +42,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while
deleting consumer group",
-        output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND.toString}"))
+        output.contains(s"Group '$missingGroup' could not be deleted due to:") &&
output.contains(Errors.GROUP_ID_NOT_FOUND.message))
   }
 
   @Test
@@ -55,7 +56,8 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     val result = service.deleteGroups()
     assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while
deleting consumer group",
-      result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
+      result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).get.getCause
+        .isInstanceOf[GroupIdNotFoundException])
   }
 
   @Test
@@ -69,11 +71,11 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.")
+    }, "The group did not initialize as expected.", maxRetries = 3)
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting
consumer group",
-      output.contains(s"Group '$group' could not be deleted due to: ${Errors.NON_EMPTY_GROUP}"))
+      output.contains(s"Group '$group' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message))
   }
 
   @Test
@@ -87,11 +89,11 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.")
+    }, "The group did not initialize as expected.", maxRetries = 3)
 
     val result = service.deleteGroups()
     assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting
consumer group",
-      result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NON_EMPTY_GROUP))
+      result.size == 1 && result.keySet.contains(group) && result.get(group).get.getCause.isInstanceOf[GroupNotEmptyException])
   }
 
   @Test
@@ -105,13 +107,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.")
+    }, "The group did not initialize as expected.", maxRetries = 3)
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState().state == "Empty"
-    }, "The group did become empty as expected.")
+    }, "The group did become empty as expected.", maxRetries = 3)
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The consumer group could not be deleted as expected",
@@ -129,17 +131,17 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.")
+    }, "The group did not initialize as expected.", maxRetries = 3)
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState().state == "Empty"
-    }, "The group did become empty as expected.")
+    }, "The group did become empty as expected.", maxRetries = 3)
 
     val result = service.deleteGroups()
     assertTrue(s"The consumer group could not be deleted as expected",
-      result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+      result.size == 1 && result.keySet.contains(group) && result.get(group).get
== null)
   }
 
   @Test
@@ -154,18 +156,18 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.")
+    }, "The group did not initialize as expected.", maxRetries = 3)
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState().state == "Empty"
-    }, "The group did become empty as expected.")
+    }, "The group did become empty as expected.", maxRetries = 3)
 
     val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
     val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
     assertTrue(s"The consumer group deletion did not work as expected",
-      output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND}")
&&
+      output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message)
&&
       output.contains(s"These consumer groups were deleted successfully: '$group'"))
   }
 
@@ -181,20 +183,20 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.")
+    }, "The group did not initialize as expected.", maxRetries = 3)
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState().state == "Empty"
-    }, "The group did become empty as expected.")
+    }, "The group did become empty as expected.", maxRetries = 3)
 
     val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
     val result = service2.deleteGroups()
     assertTrue(s"The consumer group deletion did not work as expected",
       result.size == 2 &&
-        result.keySet.contains(group) && result.get(group).contains(Errors.NONE)
&&
-        result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
+        result.keySet.contains(group) && result.get(group).get == null &&
+        result.keySet.contains(missingGroup) && result.get(missingGroup).get.getMessage.contains(Errors.GROUP_ID_NOT_FOUND.message))
   }
 
   @Test
@@ -206,7 +208,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The consumer group deletion did not work as expected",
-      output.contains(s"Group '$group' could not be deleted due to: ${Errors.COORDINATOR_NOT_AVAILABLE}"))
+      output.contains(s"Group '$group' could not be deleted due to"))
   }
 
   @Test
@@ -218,8 +220,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     val result = service.deleteGroups()
     assertTrue(s"The consumer group deletion did not work as expected",
-      result.size == 1 &&
-        result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE))
+      result.size == 1 && result.keySet.contains(group))
   }
 
   @Test(expected = classOf[OptionException])
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 6c2b09b..fac34a7 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -20,10 +20,11 @@ import joptsimple.OptionException
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.RoundRobinAssignor
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.errors.{TimeoutException}
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.concurrent.ExecutionException
 import scala.util.Random
 
 class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
@@ -121,7 +122,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       TestUtils.waitUntilTrue(() => {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
         output.trim.split("\n").length == 2 && error.isEmpty
-      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString("
")}.")
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString("
")}.", maxRetries = 3)
     }
   }
 
@@ -143,7 +144,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
&&
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
&&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-    }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client
id / host columns in describe results for group $group.")
+    }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client
id / host columns in describe results for group $group.", maxRetries = 3)
   }
 
   @Test
@@ -167,7 +168,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
           case None =>
             false
         })
-    }, s"Expected a 'Stable' group status, rows and valid member information for group $group.")
+    }, s"Expected a 'Stable' group status, rows and valid member information for group $group.",
maxRetries = 3)
 
     val (_, assignments) = service.collectGroupMembers(true)
     assignments match {
@@ -196,7 +197,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.assignmentStrategy == "range" &&
         state.coordinator != null &&
         servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
-    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy
for group $group.")
+    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy
for group $group.", maxRetries = 3)
   }
 
   @Test
@@ -215,7 +216,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.assignmentStrategy == "roundrobin" &&
         state.coordinator != null &&
         servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
-    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy
for group $group.")
+    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy
for group $group.", maxRetries = 3)
   }
 
   @Test
@@ -232,14 +233,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       TestUtils.waitUntilTrue(() => {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
         output.trim.split("\n").length == 2 && error.isEmpty
-      }, s"Expected describe group results with one data row for describe type '${describeType.mkString("
")}'")
+      }, s"Expected describe group results with one data row for describe type '${describeType.mkString("
")}'", maxRetries = 3)
 
       // stop the consumer so the group has no active member anymore
       executor.shutdown()
-
       TestUtils.waitUntilTrue(() => {
         TestUtils.grabConsoleError(service.describeGroup()).contains(s"Consumer group '$group'
has no active members.")
-      }, s"Expected no active member in describe group results with describe type ${describeType.mkString("
")}")
+      }, s"Expected no active member in describe group results with describe type ${describeType.mkString("
")}", maxRetries = 3)
     }
   }
 
@@ -256,7 +256,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupOffsets()
       state.contains("Stable") && assignments.exists(_.exists(_.group == group))
-    }, "Expected the group to initially become stable, and to find group in assignments after
initial offset commit.")
+    }, "Expected the group to initially become stable, and to find group in assignments after
initial offset commit.", maxRetries = 3)
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -289,7 +289,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupMembers(false)
       state.contains("Stable") && assignments.exists(_.exists(_.group == group))
-    }, "Expected the group to initially become stable, and to find group in assignments after
initial offset commit.")
+    }, "Expected the group to initially become stable, and to find group in assignments after
initial offset commit.", maxRetries = 3)
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -297,7 +297,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupMembers(false)
       state.contains("Empty") && assignments.isDefined && assignments.get.isEmpty
-    }, s"Expected no member in describe group members results for group '$group'")
+    }, s"Expected no member in describe group members results for group '$group'", maxRetries
= 3)
   }
 
   @Test
@@ -316,7 +316,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.numMembers == 1 &&
         state.coordinator != null &&
         servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
-    }, s"Expected the group '$group' to initially become stable, and have a single member.")
+    }, s"Expected the group '$group' to initially become stable, and have a single member.",
maxRetries = 3)
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -324,7 +324,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val state = service.collectGroupState()
       state.state == "Empty" && state.numMembers == 0 && state.assignmentStrategy
== ""
-    }, s"Expected the group '$group' to become empty after the only member leaving.")
+    }, s"Expected the group '$group' to become empty after the only member leaving.", maxRetries
= 3)
   }
 
   @Test
@@ -342,7 +342,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
         val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2
         error.isEmpty && output.trim.split("\n").size == expectedNumRows
-      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString("
")}'")
+      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString("
")}'", maxRetries = 3)
     }
   }
 
@@ -362,7 +362,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
         assignments.get.count { x => x.group == group && x.partition.isDefined
} == 1
-    }, "Expected rows for consumers with no assigned partitions in describe group results")
+    }, "Expected rows for consumers with no assigned partitions in describe group results",
maxRetries = 3)
   }
 
   @Test
@@ -383,7 +383,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count { x => x.group == group && x.numPartitions == 1
} == 1 &&
         assignments.get.count { x => x.group == group && x.numPartitions == 0
} == 1 &&
         assignments.get.count(_.assignment.nonEmpty) == 0
-    }, "Expected rows for consumers with no assigned partitions in describe group results")
+    }, "Expected rows for consumers with no assigned partitions in describe group results",
maxRetries = 3)
 
     val (state, assignments) = service.collectGroupMembers(true)
     assertTrue("Expected additional columns in verbose version of describe members",
@@ -403,7 +403,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val state = service.collectGroupState()
       state.state == "Stable" && state.numMembers == 2
-    }, "Expected two consumers in describe group results")
+    }, "Expected two consumers in describe group results", maxRetries = 3)
   }
 
   @Test
@@ -423,7 +423,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
         val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3
         error.isEmpty && output.trim.split("\n").size == expectedNumRows
-      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString("
")}'")
+      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString("
")}'", maxRetries = 3)
     }
   }
 
@@ -446,7 +446,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count{ x => x.group == group && x.partition.isDefined}
== 2 &&
         assignments.get.count{ x => x.group == group && x.partition.isEmpty} ==
0
-    }, "Expected two rows (one row per consumer) in describe group results.")
+    }, "Expected two rows (one row per consumer) in describe group results.", maxRetries
= 3)
   }
 
   @Test
@@ -468,7 +468,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count{ x => x.group == group && x.numPartitions == 1 }
== 2 &&
         assignments.get.count{ x => x.group == group && x.numPartitions == 0 }
== 0
-    }, "Expected two rows (one row per consumer) in describe group members results.")
+    }, "Expected two rows (one row per consumer) in describe group members results.", maxRetries
= 3)
 
     val (state, assignments) = service.collectGroupMembers(true)
     assertTrue("Expected additional columns in verbose version of describe members",
@@ -490,7 +490,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val state = service.collectGroupState()
       state.state == "Stable" && state.group == group && state.numMembers
== 2
-    }, "Expected a stable group with two members in describe group state result.")
+    }, "Expected a stable group with two members in describe group state result.", maxRetries
= 3)
   }
 
   @Test
@@ -508,7 +508,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupOffsets()
       state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group
== group) == 2
-    }, "Expected a stable group with two members in describe group state result.")
+    }, "Expected a stable group with two members in describe group state result.", maxRetries
= 3)
   }
 
   @Test
@@ -528,7 +528,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       TestUtils.grabConsoleOutputAndError(service.describeGroup())
       fail(s"The consumer group command should have failed due to low initialization timeout
(describe type: ${describeType.mkString(" ")})")
     } catch {
-      case _: TimeoutException => // OK
+      case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])
// OK
     }
   }
 
@@ -548,7 +548,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       service.collectGroupOffsets()
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
-      case _: TimeoutException => // OK
+      case e : ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])
// OK
     }
   }
 
@@ -568,12 +568,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       service.collectGroupMembers(false)
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
-      case _: TimeoutException => // OK
+      case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])//
OK
         try {
           service.collectGroupMembers(true)
           fail("The consumer group command should fail due to low initialization timeout
(verbose)")
         } catch {
-          case _: TimeoutException => // OK
+          case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])
// OK
         }
     }
   }
@@ -594,7 +594,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       service.collectGroupState()
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
-      case _: TimeoutException => // OK
+      case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])
// OK
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 32f6614..1a35c4c 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -36,7 +36,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       foundGroups = service.listGroups().toSet
       expectedGroups == foundGroups
-    }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
+    }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.", maxRetries
= 3)
   }
 
   @Test(expected = classOf[OptionException])
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7b68cc0..21205ed 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer,
OffsetA
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.RetriableException
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -772,17 +773,33 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Wait until the given condition is true or throw an exception if the given wait time
elapses.
-   */
+    *  Wait until the given condition is true or throw an exception if the given wait time
elapses.
+    *
+    * @param condition condition to check
+    * @param msg error message
+    * @param waitTime maximum time to wait and retest the condition before failing the test
+    * @param pause delay between condition checks
+    * @param maxRetries maximum number of retries to check the given condition if a retriable
exception is thrown
+    */
   def waitUntilTrue(condition: () => Boolean, msg: => String,
-                    waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L):
Unit = {
+                    waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L,
maxRetries: Int = 0): Unit = {
     val startTime = System.currentTimeMillis()
+    var retry = 0
     while (true) {
-      if (condition())
-        return
-      if (System.currentTimeMillis() > startTime + waitTime)
-        fail(msg)
-      Thread.sleep(waitTime.min(pause))
+      try {
+        if (condition())
+          return
+        if (System.currentTimeMillis() > startTime + waitTime)
+          fail(msg)
+        Thread.sleep(waitTime.min(pause))
+      }
+      catch {
+        case e: RetriableException if retry < maxRetries => {
+          debug("Retrying after error", e)
+          retry += 1
+        }
+        case e : Throwable => throw e
+      }
     }
     // should never hit here
     throw new RuntimeException("unexpected error")


Mime
View raw message