kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7091; AdminClient should handle FindCoordinatorResponse errors (#5278)
Date Wed, 27 Jun 2018 22:09:48 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51935ee  KAFKA-7091; AdminClient should handle FindCoordinatorResponse errors (#5278)
51935ee is described below

commit 51935ee2e6dc473415b6708b60c9d47410876997
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Thu Jun 28 03:39:44 2018 +0530

    KAFKA-7091; AdminClient should handle FindCoordinatorResponse errors (#5278)
    
    - Update KafkaAdminClient implementation to handle FindCoordinatorResponse errors
    - Remove scala AdminClient usage from core and streams tests
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 28 +++++---
 .../common/requests/FindCoordinatorResponse.java   |  8 ++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 12 ++++
 .../kafka/api/AdminClientIntegrationTest.scala     | 15 +---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 25 +++----
 .../integration/kafka/api/ConsumerBounceTest.scala | 83 ++++++++++++++++------
 .../api/SaslSslAdminClientIntegrationTest.scala    |  2 +
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 15 +++-
 .../integration/AbstractResetIntegrationTest.java  | 21 ++----
 9 files changed, 136 insertions(+), 73 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 62b6b6e..7e245d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2395,16 +2395,9 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse)
abstractResponse;
-                    Errors error = fcResponse.error();
-                    if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
-                        // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary.
-                        throw error.exception();
-                    } else if (error != Errors.NONE) {
-                        // All other errors are immediate failures.
-                        KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                        future.completeExceptionally(error.exception());
+
+                    if (handleFindCoordinatorError(fcResponse, futures.get(groupId)))
                         return;
-                    }
 
                     final long nowDescribeConsumerGroups = time.milliseconds();
                     final int nodeId = fcResponse.node().id();
@@ -2476,6 +2469,17 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
     }
 
+    private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?>
future) {
+        Errors error = response.error();
+        if (error.exception() instanceof RetriableException) {
+            throw error.exception();
+        } else if (response.hasError()) {
+            future.completeExceptionally(error.exception());
+            return true;
+        }
+        return false;
+    }
+
     private final static class ListConsumerGroupsResults {
         private final List<Throwable> errors;
         private final HashMap<String, ConsumerGroupListing> listings;
@@ -2610,6 +2614,9 @@ public class KafkaAdminClient extends AdminClient {
             void handleResponse(AbstractResponse abstractResponse) {
                 final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
+                if (handleFindCoordinatorError(response, groupOffsetListingFuture))
+                    return;
+
                 final long nowListConsumerGroupOffsets = time.milliseconds();
 
                 final int nodeId = response.node().id();
@@ -2696,6 +2703,9 @@ public class KafkaAdminClient extends AdminClient {
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
+                    if (handleFindCoordinatorError(response, futures.get(groupId)))
+                        return;
+
                     final long nowDeleteConsumerGroups = time.milliseconds();
 
                     final int nodeId = response.node().id();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 39726da..bc7f654 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -68,9 +68,11 @@ public class FindCoordinatorResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
      * COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR (16)
      * GROUP_AUTHORIZATION_FAILED (30)
+     * INVALID_REQUEST (42)
+     * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
      */
 
 
@@ -107,6 +109,10 @@ public class FindCoordinatorResponse extends AbstractResponse {
         return throttleTimeMs;
     }
 
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
+
     public Errors error() {
         return error;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3566f83..8363079 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -1071,6 +1072,10 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setNode(env.cluster().controller());
 
+            //Retriable FindCoordinatorResponse errors should be retried
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
 Node.noNode()));
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Node.noNode()));
+
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             final Map<String, Errors> response = new HashMap<>();
@@ -1081,6 +1086,13 @@ public class KafkaAdminClientTest {
 
             final KafkaFuture<Void> results = result.deletedGroups().get("group-0");
             assertNull(results.get());
+
+            //should throw error for non-retriable errors
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
 Node.noNode()));
+
+            final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
+            assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
+
         }
     }
 
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index fe98fda..9055e68 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -29,12 +29,13 @@ import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig, KafkaServer}
 import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
+import kafka.utils.TestUtils._
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, TopicPartition, TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
@@ -125,18 +126,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       }, "timed out waiting for topics")
   }
 
-  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]):
Unit = {
-    try {
-      future.get()
-      fail("Expected CompletableFuture.get to return an exception")
-    } catch {
-      case e: ExecutionException =>
-        val cause = e.getCause()
-        assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
-            cause.getClass().getName, clazz.isInstance(cause))
-    }
-  }
-
   @Test
   def testClose(): Unit = {
     val client = AdminClient.create(createConfig())
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a21affc..72b3b24 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 import java.time.Duration
 
-import kafka.admin.AdminClient
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
 import kafka.common.TopicAndPartition
 import kafka.log.LogConfig
@@ -27,7 +26,7 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.NewPartitions
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
@@ -1004,17 +1003,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     this.consumers.head.partitionsFor(topic)
   }
 
-  @Test(expected = classOf[GroupAuthorizationException])
+  @Test
   def testDescribeGroupApiWithNoGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    createAdminClient().describeConsumerGroup(group)
+    val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
+    TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException])
   }
 
   @Test
   def testDescribeGroupApiWithGroupDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    createAdminClient().describeConsumerGroup(group)
+    createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get()
   }
 
   @Test
@@ -1036,8 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
-    val result = createAdminClient().deleteConsumerGroups(List(group))
-    assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+    createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get()
   }
 
   @Test
@@ -1046,14 +1045,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
-    val result = createAdminClient().deleteConsumerGroups(List(group))
-    assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+    val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
+    TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
   }
 
   @Test
   def testDeleteGroupApiWithNoDeleteGroupAcl2() {
-    val result = createAdminClient().deleteConsumerGroups(List(group))
-    assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+    val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
+    TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
   }
 
   @Test
@@ -1462,7 +1461,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createAdminClient(): AdminClient = {
-    val adminClient = AdminClient.createSimplePlaintext(brokerList)
+    val props = new Properties()
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val adminClient = AdminClient.create(props)
     adminClients += adminClient
     adminClient
   }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f200cc2..07cbf0c 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -14,28 +14,35 @@
 package kafka.api
 
 import java.util.concurrent._
-import java.util.{Collection, Collections}
+import java.util.{Collection, Collections, Properties}
 
-import kafka.admin.AdminClient
-import kafka.server.KafkaConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.{After, Before, Ignore, Test}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.Buffer
 
 
 /**
  * Integration tests for the consumer that cover basic usage as well as server failures
  */
-class ConsumerBounceTest extends IntegrationTestHarness with Logging {
+class ConsumerBounceTest extends BaseRequestTest with Logging {
+
+  override def numBrokers: Int = 3
 
   val producerCount = 1
   val consumerCount = 2
-  val serverCount = 3
+
+  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
   val topic = "topic"
   val part = 0
@@ -45,13 +52,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   val gracefulCloseTimeMs = 1000
   val executor = Executors.newScheduledThreadPool(2)
 
-  // configure the servers and clients
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small
enough session timeout
-  this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
-  this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+  val producerConfig = new Properties
+  val consumerConfig = new Properties
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -59,8 +61,19 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
+  def serverConfig(): Properties = {
+    val properties = new Properties
+    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose
offset
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session
timeout
+    properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+    properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+    properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+    properties
+  }
+
   override def generateConfigs = {
-    FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown
= false)
+    FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown
= false)
       .map(KafkaConfig.fromProps(_, serverConfig))
   }
 
@@ -68,8 +81,26 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   override def setUp() {
     super.setUp()
 
+    for (_ <- 0 until producerCount)
+      producers += createProducer
+
+    for (_ <- 0 until consumerCount)
+      consumers += createConsumer
+
     // create the test topic with all the brokers as replicas
-    createTopic(topic, 1, serverCount)
+    createTopic(topic, 1, numBrokers)
+  }
+
+  def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+    TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+        securityProtocol = SecurityProtocol.PLAINTEXT,
+        props = Some(producerConfig))
+  }
+
+  def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
+    TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
+        securityProtocol = SecurityProtocol.PLAINTEXT,
+        props = Some(consumerConfig))
   }
 
   @After
@@ -78,6 +109,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       executor.shutdownNow()
       // Wait for any active tasks to terminate to ensure consumer is not closed while being
used from another thread
       assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
+      producers.foreach(_.close())
+      consumers.foreach(_.close())
     } finally {
       super.tearDown()
     }
@@ -173,7 +206,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val consumer = this.consumers.head
     consumer.subscribe(Collections.singleton(newtopic))
     executor.schedule(new Runnable {
-        def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor
= serverCount)
+        def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor =
numBrokers)
       }, 2, TimeUnit.SECONDS)
     consumer.poll(0)
 
@@ -243,9 +276,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords)
     val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords)
 
-    val adminClient = AdminClient.createSimplePlaintext(this.brokerList)
-    killBroker(adminClient.findCoordinator(dynamicGroup).id)
-    killBroker(adminClient.findCoordinator(manualGroup).id)
+    killBroker(findCoordinator(dynamicGroup))
+    killBroker(findCoordinator(manualGroup))
 
     val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
     val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs))
@@ -255,9 +287,16 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging
{
     restartDeadBrokers()
     checkClosedState(dynamicGroup, 0)
     checkClosedState(manualGroup, numRecords)
-    adminClient.close()
   }
 
+  private def findCoordinator(group: String) : Int = {
+    val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP,
group).build()
+    val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR)
+    val response = FindCoordinatorResponse.parse(resp, ApiKeys.FIND_COORDINATOR.latestVersion())
+    response.node().id()
+  }
+
+
   /**
    * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets
since
    * there is no coordinator, but close should timeout and return. If close is invoked with
a very
@@ -288,7 +327,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   @Test
   def testCloseDuringRebalance() {
     val topic = "closetest"
-    createTopic(topic, 10, serverCount)
+    createTopic(topic, 10, numBrokers)
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -355,7 +394,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
 
   private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] =
{
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-    val consumer = super.createConsumer
+    val consumer = createConsumer
     consumers += consumer
     consumer
   }
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 9da6937..3b63613 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -18,6 +18,8 @@ import java.util
 import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create,
Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl
=> AuthAcl, Resource => AuthResource}
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import kafka.utils.TestUtils._
+
 import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index db45196..7b68cc0 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util.{Collections, Properties}
-import java.util.concurrent.{Callable, Executors, TimeUnit}
+import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
 
 import javax.net.ssl.X509TrustManager
 import kafka.api._
@@ -41,7 +41,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata,
RangeAssignor}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
@@ -1374,4 +1374,15 @@ object TestUtils extends Logging {
     (out.toString, err.toString)
   }
 
+  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]):
Unit = {
+    try {
+      future.get()
+      fail("Expected CompletableFuture.get to return an exception")
+    } catch {
+      case e: ExecutionException =>
+        val cause = e.getCause()
+        assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
+            cause.getClass().getName, clazz.isInstance(cause))
+    }
+  }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 249e2c3..64b23cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -17,12 +17,12 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -61,9 +61,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import kafka.admin.AdminClient;
 import kafka.tools.StreamsResetter;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -77,20 +77,15 @@ public abstract class AbstractResetIntegrationTest {
     private static MockTime mockTime;
     private static KafkaStreams streams;
     private static AdminClient adminClient = null;
-    private static KafkaAdminClient kafkaAdminClient = null;
 
     abstract Map<String, Object> getClientSslConfig();
 
     @AfterClass
     public static void afterClassCleanup() {
         if (adminClient != null) {
-            adminClient.close();
+            adminClient.close(10, TimeUnit.SECONDS);
             adminClient = null;
         }
-        if (kafkaAdminClient != null) {
-            kafkaAdminClient.close(10, TimeUnit.SECONDS);
-            kafkaAdminClient = null;
-        }
     }
 
     private String appID = "abstract-reset-integration-test";
@@ -103,9 +98,6 @@ public abstract class AbstractResetIntegrationTest {
         if (adminClient == null) {
             adminClient = AdminClient.create(commonClientConfig);
         }
-        if (kafkaAdminClient == null) {
-            kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
-        }
 
         boolean timeSet = false;
         while (!timeSet) {
@@ -184,8 +176,9 @@ public abstract class AbstractResetIntegrationTest {
         @Override
         public boolean conditionMet() {
             try {
-                return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty();
-            } catch (final TimeoutException e) {
+                ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get();
+                return groupDescription.members().isEmpty();
+            } catch (final ExecutionException | InterruptedException e) {
                 return false;
             }
         }


Mime
View raw message