kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boy...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10270: A broker to controller channel manager (#9012)
Date Wed, 29 Jul 2020 18:41:40 GMT
This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de2e693  KAFKA-10270: A broker to controller channel manager (#9012)
de2e693 is described below

commit de2e6938c8648f02254a645a8fff9c2fa8364ef1
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Wed Jul 29 11:40:14 2020 -0700

    KAFKA-10270: A broker to controller channel manager (#9012)
    
    Add a broker to controller channel manager for use cases such as redirection and AlterIsr.
    
    Reviewers: David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>,
Ismael Juma <ismael@juma.me.uk>
    
    Co-authored-by: Viktor Somogyi <viktorsomogyi@gmail.com>
    Co-authored-by: Boyang Chen <boyang@confluent.io>
---
 .../kafka/clients/ManualMetadataUpdater.java       |   2 +-
 .../scala/kafka/common/InterBrokerSendThread.scala |  16 +-
 .../server/BrokerToControllerChannelManager.scala  | 187 ++++++++++++++++++++
 core/src/main/scala/kafka/server/KafkaServer.scala |   4 +
 .../main/scala/kafka/server/MetadataCache.scala    |   3 +-
 .../kafka/common/InterBrokerSendThreadTest.scala   |  26 +--
 .../BrokerToControllerRequestThreadTest.scala      | 193 +++++++++++++++++++++
 7 files changed, 413 insertions(+), 18 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index c1c1fba..3d51549 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -39,7 +39,7 @@ public class ManualMetadataUpdater implements MetadataUpdater {
     private List<Node> nodes;
 
     public ManualMetadataUpdater() {
-        this(new ArrayList<Node>(0));
+        this(new ArrayList<>(0));
     }
 
     public ManualMetadataUpdater(List<Node> nodes) {
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 3eff03d..11e1aa8 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -20,7 +20,7 @@ import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap,
Itera
 import java.util.Map.Entry
 
 import kafka.utils.ShutdownableThread
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, KafkaClient, RequestCompletionHandler}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.AuthenticationException
 import org.apache.kafka.common.internals.FatalExitError
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
  *  Class for inter-broker send thread that utilize a non-blocking network client.
  */
 abstract class InterBrokerSendThread(name: String,
-                                     networkClient: NetworkClient,
+                                     networkClient: KafkaClient,
                                      time: Time,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
@@ -57,8 +57,13 @@ abstract class InterBrokerSendThread(name: String,
     generateRequests().foreach { request =>
       val completionHandler = request.handler
       unsentRequests.put(request.destination,
-        networkClient.newClientRequest(request.destination.idString, request.request, now,
true,
-          requestTimeoutMs, completionHandler))
+        networkClient.newClientRequest(
+          request.destination.idString,
+          request.request,
+          now,
+          true,
+          requestTimeoutMs,
+          completionHandler))
     }
 
     try {
@@ -138,7 +143,8 @@ abstract class InterBrokerSendThread(name: String,
   def wakeup(): Unit = networkClient.wakeup()
 }
 
-case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_
<: AbstractRequest],
+case class RequestAndCompletionHandler(destination: Node,
+                                       request: AbstractRequest.Builder[_ <: AbstractRequest],
                                        handler: RequestCompletionHandler)
 
 private class UnsentRequests {
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
new file mode 100644
index 0000000..de092cc
--- /dev/null
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its
own metadata to find
+ * and connect to the controller. The channel is async and runs the network connection in
the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly response from
the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache,
+                                       time: Time,
+                                       metrics: Metrics,
+                                       config: KafkaConfig,
+                                       threadNamePrefix: Option[String] = None) extends Logging
{
+  private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]
+  private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+    requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+    requestThread.shutdown()
+    requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+    val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+    val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+    val networkClient = {
+      val channelBuilder = ChannelBuilders.clientChannelBuilder(
+        brokerToControllerSecurityProtocol,
+        JaasContext.Type.SERVER,
+        config,
+        brokerToControllerListenerName,
+        config.saslMechanismInterBrokerProtocol,
+        time,
+        config.saslInterBrokerHandshakeRequestEnable,
+        logContext
+      )
+      val selector = new Selector(
+        NetworkReceive.UNLIMITED,
+        Selector.NO_IDLE_TIMEOUT_MS,
+        metrics,
+        time,
+        "BrokerToControllerChannel",
+        Map("BrokerId" -> config.brokerId.toString).asJava,
+        false,
+        channelBuilder,
+        logContext
+      )
+      new NetworkClient(
+        selector,
+        manualMetadataUpdater,
+        config.brokerId.toString,
+        1,
+        0,
+        0,
+        Selectable.USE_DEFAULT_BUFFER_SIZE,
+        Selectable.USE_DEFAULT_BUFFER_SIZE,
+        config.requestTimeoutMs,
+        config.connectionSetupTimeoutMs,
+        config.connectionSetupTimeoutMaxMs,
+        ClientDnsLookup.USE_ALL_DNS_IPS,
+        time,
+        false,
+        new ApiVersions,
+        logContext
+      )
+    }
+    val threadName = threadNamePrefix match {
+      case None => s"broker-${config.brokerId}-to-controller-send-thread"
+      case Some(name) => s"$name:broker-${config.brokerId}-to-controller-send-thread"
+    }
+
+    new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, requestQueue,
metadataCache, config,
+      brokerToControllerListenerName, time, threadName)
+  }
+
+  private[server] def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
+                                  callback: RequestCompletionHandler): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+  }
+}
+
+case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: AbstractRequest],
+                                       callback: RequestCompletionHandler)
+
+class BrokerToControllerRequestThread(networkClient: KafkaClient,
+                                      metadataUpdater: ManualMetadataUpdater,
+                                      requestQueue: LinkedBlockingDeque[BrokerToControllerQueueItem],
+                                      metadataCache: kafka.server.MetadataCache,
+                                      config: KafkaConfig,
+                                      listenerName: ListenerName,
+                                      time: Time,
+                                      threadName: String)
+  extends InterBrokerSendThread(threadName, networkClient, time, isInterruptible = false)
{
+
+  private var activeController: Option[Node] = None
+
+  override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
+
+  override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+    val requestsToSend = new mutable.Queue[RequestAndCompletionHandler]
+    val topRequest = requestQueue.poll()
+    if (topRequest != null) {
+      val request = RequestAndCompletionHandler(
+        activeController.get,
+        topRequest.request,
+        handleResponse(topRequest),
+        )
+      requestsToSend.enqueue(request)
+    }
+    requestsToSend
+  }
+
+  private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse):
Unit = {
+    if (response.wasDisconnected()) {
+      activeController = None
+      requestQueue.putFirst(request)
+    } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER))
{
+      // just close the controller connection and wait for metadata cache update in doWork
+      networkClient.close(activeController.get.idString)
+      activeController = None
+      requestQueue.putFirst(request)
+    } else {
+      request.callback.onComplete(response)
+    }
+  }
+
+  private[server] def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
+
+  override def doWork(): Unit = {
+    if (activeController.isDefined) {
+      super.doWork()
+    } else {
+      debug("Controller isn't cached, looking for local metadata changes")
+      val controllerOpt = metadataCache.getControllerId.flatMap(metadataCache.getAliveBroker)
+      if (controllerOpt.isDefined) {
+        if (activeController.isEmpty || activeController.exists(_.id != controllerOpt.get.id))
+          info(s"Recorded new controller, from now on will use broker ${controllerOpt.get.id}")
+        activeController = Option(controllerOpt.get.node(listenerName))
+        metadataUpdater.setNodes(metadataCache.getAliveBrokers.map(_.node(listenerName)).asJava)
+      } else {
+        // need to backoff to avoid tight loops
+        debug("No controller defined in metadata cache, retrying after backoff")
+        backoff()
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 86b841f..c3ab250 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -168,6 +168,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   var kafkaController: KafkaController = null
 
+  var brokerToControllerChannelManager: BrokerToControllerChannelManager = null
+
   var kafkaScheduler: KafkaScheduler = null
 
   var metadataCache: MetadataCache = null
@@ -314,6 +316,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo,
brokerEpoch, tokenManager, threadNamePrefix)
         kafkaController.startup()
 
+        brokerToControllerChannelManager = new BrokerToControllerChannelManager(metadataCache,
time, metrics, config, threadNamePrefix)
+
         adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
 
         /* start group coordinator */
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index bdc1be9..79d84ea 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.util
-import java.util.{Collections}
+import java.util.Collections
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.{mutable, Seq, Set}
@@ -38,7 +38,6 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
-
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated
through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index ae372e0..f5110bf 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -64,10 +64,10 @@ class InterBrokerSendThreadTest {
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
-      requestTimeoutMs, handler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs,
handler.handler)
 
-    EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+    EasyMock.expect(networkClient.newClientRequest(
+      EasyMock.eq("1"),
       EasyMock.same(handler.request),
       EasyMock.anyLong(),
       EasyMock.eq(true),
@@ -101,10 +101,10 @@ class InterBrokerSendThreadTest {
       override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
-      requestTimeoutMs, requestAndCompletionHandler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs,
requestAndCompletionHandler.handler)
 
-    EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+    EasyMock.expect(networkClient.newClientRequest(
+      EasyMock.eq("1"),
       EasyMock.same(requestAndCompletionHandler.request),
       EasyMock.anyLong(),
       EasyMock.eq(true),
@@ -145,11 +145,18 @@ class InterBrokerSendThreadTest {
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true,
-      requestTimeoutMs, handler.handler)
+    val clientRequest = new ClientRequest("dest",
+      request,
+      0,
+      "1",
+      time.milliseconds(),
+      true,
+      requestTimeoutMs,
+      handler.handler)
     time.sleep(1500)
 
-    EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+    EasyMock.expect(networkClient.newClientRequest(
+      EasyMock.eq("1"),
       EasyMock.same(handler.request),
       EasyMock.eq(time.milliseconds()),
       EasyMock.eq(true),
@@ -180,7 +187,6 @@ class InterBrokerSendThreadTest {
     Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
   }
 
-
   private class StubRequestBuilder extends AbstractRequest.Builder(ApiKeys.END_TXN) {
     override def build(version: Short): Nothing = ???
   }
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
new file mode 100644
index 0000000..acc1fcf
--- /dev/null
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingDeque, TimeUnit}
+import java.util.Collections
+
+import kafka.cluster.{Broker, EndPoint}
+import kafka.utils.TestUtils
+import org.apache.kafka.test.{TestUtils => ClientsTestUtils}
+import org.apache.kafka.clients.{ManualMetadataUpdater, Metadata, MockClient}
+import org.apache.kafka.common.feature.Features
+import org.apache.kafka.common.feature.Features.emptySupportedFeatures
+import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class BrokerToControllerRequestThreadTest {
+
+  @Test
+  def testRequestsSent(): Unit = {
+    // just a simple test that tests whether the request from 1 -> 2 is sent and the response
callback is called
+    val time = new SystemTime
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+    val controllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+    val metadataCache = mock(classOf[MetadataCache])
+    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val activeController = new Broker(controllerId,
+      Seq(new EndPoint("host", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, emptySupportedFeatures)
+
+    when(metadataCache.getControllerId).thenReturn(Some(controllerId))
+    when(metadataCache.getAliveBrokers).thenReturn(Seq(activeController))
+    when(metadataCache.getAliveBroker(controllerId)).thenReturn(Some(activeController))
+
+    val expectedResponse = ClientsTestUtils.metadataUpdateWith(2, Collections.singletonMap("a",
new Integer(2)))
+    val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(),
requestQueue, metadataCache,
+      config, listenerName, time, "")
+    mockClient.prepareResponse(expectedResponse)
+
+    val responseLatch = new CountDownLatch(1)
+    val queueItem = BrokerToControllerQueueItem(
+      new MetadataRequest.Builder(new MetadataRequestData()), response => {
+        assertEquals(expectedResponse, response.responseBody())
+        responseLatch.countDown()
+      })
+    requestQueue.put(queueItem)
+    // initialize to the controller
+    testRequestThread.doWork()
+    // send and process the request
+    testRequestThread.doWork()
+
+    assertTrue(responseLatch.await(10, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testControllerChanged(): Unit = {
+    // in this test the current broker is 1, and the controller changes from 2 -> 3 then
back: 3 -> 2
+    val time = new SystemTime
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+    val oldControllerId = 1
+    val newControllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+    val metadataCache = mock(classOf[MetadataCache])
+    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val oldController = new Broker(oldControllerId,
+      Seq(new EndPoint("host1", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+    val oldControllerNode = oldController.node(listenerName)
+    val newController = new Broker(newControllerId,
+      Seq(new EndPoint("host2", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+
+    when(metadataCache.getControllerId).thenReturn(Some(oldControllerId), Some(newControllerId))
+    when(metadataCache.getAliveBroker(oldControllerId)).thenReturn(Some(oldController))
+    when(metadataCache.getAliveBroker(newControllerId)).thenReturn(Some(newController))
+    when(metadataCache.getAliveBrokers).thenReturn(Seq(oldController, newController))
+
+    val expectedResponse = ClientsTestUtils.metadataUpdateWith(3, Collections.singletonMap("a",
new Integer(2)))
+    val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(),
+      requestQueue, metadataCache, config, listenerName, time, "")
+
+    val responseLatch = new CountDownLatch(1)
+
+    val queueItem = BrokerToControllerQueueItem(
+      new MetadataRequest.Builder(new MetadataRequestData()), response => {
+        assertEquals(expectedResponse, response.responseBody())
+        responseLatch.countDown()
+      })
+    requestQueue.put(queueItem)
+    mockClient.prepareResponse(expectedResponse)
+    // initialize the thread with oldController
+    testRequestThread.doWork()
+    // assert queue correctness
+    assertFalse(requestQueue.isEmpty)
+    assertEquals(1, requestQueue.size())
+    assertEquals(queueItem, requestQueue.peek())
+    // disconnect the node
+    mockClient.setUnreachable(oldControllerNode, time.milliseconds() + 5000)
+    // verify that the client closed the connection to the faulty controller
+    testRequestThread.doWork()
+    assertFalse(requestQueue.isEmpty)
+    assertEquals(1, requestQueue.size())
+    // should connect to the new controller
+    testRequestThread.doWork()
+    // should send the request and process the response
+    testRequestThread.doWork()
+
+    assertTrue(responseLatch.await(10, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testNotController(): Unit = {
+    val time = new SystemTime
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+    val oldControllerId = 1
+    val newControllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+    val metadataCache = mock(classOf[MetadataCache])
+    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val oldController = new Broker(oldControllerId,
+      Seq(new EndPoint("host1", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+    val newController = new Broker(2,
+      Seq(new EndPoint("host2", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+
+    when(metadataCache.getControllerId).thenReturn(Some(oldControllerId), Some(newControllerId))
+    when(metadataCache.getAliveBrokers).thenReturn(Seq(oldController, newController))
+    when(metadataCache.getAliveBroker(oldControllerId)).thenReturn(Some(oldController))
+    when(metadataCache.getAliveBroker(newControllerId)).thenReturn(Some(newController))
+
+    val responseWithNotControllerError = ClientsTestUtils.metadataUpdateWith("cluster1",
2,
+      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
+      Collections.singletonMap("a", new Integer(2)))
+    val expectedResponse = ClientsTestUtils.metadataUpdateWith(3, Collections.singletonMap("a",
new Integer(2)))
+    val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(),
requestQueue, metadataCache,
+      config, listenerName, time, "")
+
+    val responseLatch = new CountDownLatch(1)
+    val queueItem = BrokerToControllerQueueItem(
+      new MetadataRequest.Builder(new MetadataRequestData()
+        .setAllowAutoTopicCreation(true)), response => {
+        assertEquals(expectedResponse, response.responseBody())
+        responseLatch.countDown()
+      })
+    requestQueue.put(queueItem)
+    // initialize to the controller
+    testRequestThread.doWork()
+    // send and process the request
+    mockClient.prepareResponse((body: AbstractRequest) => {
+      body.isInstanceOf[MetadataRequest] &&
+      body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
+    }, responseWithNotControllerError)
+    testRequestThread.doWork()
+    // reinitialize the controller to a different node
+    testRequestThread.doWork()
+    // process the request again
+    mockClient.prepareResponse(expectedResponse)
+    testRequestThread.doWork()
+
+    assertTrue(responseLatch.await(10, TimeUnit.SECONDS))
+  }
+}


Mime
View raw message