kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/3] kafka git commit: KAFKA-2411; remove usage of blocking channel
Date Wed, 02 Sep 2015 18:55:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d0adf6abe -> d02ca36ca


http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 039c7eb..756cf77 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,33 +17,39 @@
 
 package kafka.server
 
+import java.net.{SocketTimeoutException}
 import java.util
 
 import kafka.admin._
+import kafka.api.{KAFKA_083, ApiVersion}
 import kafka.log.LogConfig
 import kafka.log.CleanerConfig
 import kafka.log.LogManager
 import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
-import java.io.File
+import java.io.{IOException, File}
 
 import kafka.utils._
+import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient}
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.NetworkReceive
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector}
+import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
+import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest,
RequestSend}
+import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.utils.AppInfoParser
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.{EndPoint, Broker}
-import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
 import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
 import kafka.network.{BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
-import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
+import kafka.coordinator.{ConsumerCoordinator}
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -92,7 +98,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
   // This exists because the Metrics package from clients has its own Time implementation.
   // SocketServer/Quotas (which uses client libraries) have to use the client Time objects
without having to convert all of Kafka to use them
   // Eventually, we want to merge the Time objects in core and clients
-  private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
+  private implicit val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
   var metrics: Metrics = null
 
   private val metricConfig: MetricConfig = new MetricConfig()
@@ -175,7 +181,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, brokerState)
+        kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime,
metrics)
         kafkaController.startup()
 
         /* start kafka coordinator */
@@ -262,17 +268,126 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
extends Logg
    *  Performs controlled shutdown
    */
   private def controlledShutdown() {
-    if (startupComplete.get() && config.controlledShutdownEnable) {
-      // We request the controller to do a controlled shutdown. On failure, we backoff for
a configured period
-      // of time and try again for a configured number of retries. If all the attempt fails,
we simply force
-      // the shutdown.
-      var remainingRetries = config.controlledShutdownMaxRetries
-      info("Starting controlled shutdown")
-      var channel : BlockingChannel = null
-      var prevController : Broker = null
-      var shutdownSucceeded : Boolean = false
+
+    def node(broker: Broker): Node = {
+      val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
+      new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+    }
+
+    val socketTimeoutMs = config.controllerSocketTimeoutMs
+
+    def socketTimeoutException: Throwable =
+      new SocketTimeoutException(s"Did not receive response within $socketTimeoutMs")
+
+    def networkClientControlledShutdown(retries: Int): Boolean = {
+      val metadataUpdater = new ManualMetadataUpdater()
+      val networkClient = {
+        val selector = new Selector(
+          NetworkReceive.UNLIMITED,
+          config.connectionsMaxIdleMs,
+          metrics,
+          kafkaMetricsTime,
+          "kafka-server-controlled-shutdown",
+          Map.empty.asJava,
+          false,
+          ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT,
config.channelConfigs)
+        )
+        new NetworkClient(
+          selector,
+          metadataUpdater,
+          config.brokerId.toString,
+          1,
+          0,
+          Selectable.USE_DEFAULT_BUFFER_SIZE,
+          Selectable.USE_DEFAULT_BUFFER_SIZE)
+      }
+
+      var shutdownSucceeded: Boolean = false
+
+      try {
+
+        var remainingRetries = retries
+        var prevController: Broker = null
+        var ioException = false
+
+        while (!shutdownSucceeded && remainingRetries > 0) {
+          remainingRetries = remainingRetries - 1
+
+          import NetworkClientBlockingOps._
+
+          // 1. Find the controller and establish a connection to it.
+
+          // Get the current controller info. This is to ensure we use the most recent info
to issue the
+          // controlled shutdown request
+          val controllerId = ZkUtils.getController(zkClient)
+          ZkUtils.getBrokerInfo(zkClient, controllerId) match {
+            case Some(broker) =>
+              // if this is the first attempt, if the controller has changed or if an exception
was thrown in a previous
+              // attempt, connect to the most recent controller
+              if (ioException || broker != prevController) {
+
+                ioException = false
+
+                if (prevController != null)
+                  networkClient.close(node(prevController).idString)
+
+                prevController = broker
+                metadataUpdater.setNodes(Seq(node(prevController)).asJava)
+              }
+            case None => //ignore and try again
+          }
+
+          // 2. issue a controlled shutdown to the controller
+          if (prevController != null) {
+            try {
+
+              if (!networkClient.blockingReady(node(prevController), socketTimeoutMs))
+                throw socketTimeoutException
+
+              // send the controlled shutdown request
+              val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
+              val send = new RequestSend(node(prevController).idString, requestHeader,
+                new ControlledShutdownRequest(config.brokerId).toStruct)
+              val request = new ClientRequest(kafkaMetricsTime.milliseconds(), true, send,
null)
+              val clientResponse = networkClient.blockingSendAndReceive(request, socketTimeoutMs).getOrElse
{
+                throw socketTimeoutException
+              }
+
+              val shutdownResponse = new ControlledShutdownResponse(clientResponse.responseBody)
+              if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty)
{
+                shutdownSucceeded = true
+                info("Controlled shutdown succeeded")
+              }
+              else {
+                info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(",")))
+                info("Error code from controller: %d".format(shutdownResponse.errorCode))
+              }
+            }
+            catch {
+              case ioe: IOException =>
+                ioException = true
+                warn("Error during controlled shutdown, possibly because leader movement
took longer than the configured socket.timeout.ms: %s".format(ioe.getMessage))
+                // ignore and try again
+            }
+          }
+          if (!shutdownSucceeded) {
+            Thread.sleep(config.controlledShutdownRetryBackoffMs)
+            warn("Retrying controlled shutdown after the previous attempt failed...")
+          }
+        }
+      }
+      finally
+        networkClient.close()
+
+      shutdownSucceeded
+    }
+
+    def blockingChannelControlledShutdown(retries: Int): Boolean = {
+      var remainingRetries = retries
+      var channel: BlockingChannel = null
+      var prevController: Broker = null
+      var shutdownSucceeded: Boolean = false
       try {
-        brokerState.newState(PendingControlledShutdown)
         while (!shutdownSucceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
 
@@ -286,9 +401,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
               if (channel == null || prevController == null || !prevController.equals(broker))
{
                 // if this is the first attempt or if the controller has changed, create
a channel to the most recent
                 // controller
-                if (channel != null) {
+                if (channel != null)
                   channel.disconnect()
-                }
+
                 channel = new BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host,
                   broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port,
                   BlockingChannel.UseDefaultBufferSize,
@@ -297,8 +412,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                 channel.connect()
                 prevController = broker
               }
-            case None=>
-              //ignore and try again
+            case None => //ignore and try again
           }
 
           // 2. issue a controlled shutdown to the controller
@@ -306,13 +420,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
extends Logg
             var response: NetworkReceive = null
             try {
               // send the controlled shutdown request
-              val request = new ControlledShutdownRequest(correlationId.getAndIncrement,
config.brokerId)
+              val request = new kafka.api.ControlledShutdownRequest(0, correlationId.getAndIncrement,
None, config.brokerId)
               channel.send(request)
 
               response = channel.receive()
-              val shutdownResponse = ControlledShutdownResponse.readFrom(response.payload())
+              val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
               if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining
!= null &&
-                  shutdownResponse.partitionsRemaining.size == 0) {
+                shutdownResponse.partitionsRemaining.size == 0) {
                 shutdownSucceeded = true
                 info ("Controlled shutdown succeeded")
               }
@@ -341,9 +455,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
           channel = null
         }
       }
-      if (!shutdownSucceeded) {
+      shutdownSucceeded
+    }
+
+    if (startupComplete.get() && config.controlledShutdownEnable) {
+      // We request the controller to do a controlled shutdown. On failure, we backoff for
a configured period
+      // of time and try again for a configured number of retries. If all the attempt fails,
we simply force
+      // the shutdown.
+      info("Starting controlled shutdown")
+
+      brokerState.newState(PendingControlledShutdown)
+
+      val shutdownSucceeded =
+        // Before 0.8.3, `ControlledShutdownRequest` did not contain `client_id` and it's
a mandatory field in
+        // `RequestHeader`, which is used by `NetworkClient`
+        if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_083))
+          networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
+        else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
+      
+      if (!shutdownSucceeded)
         warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts
failed")
-      }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
new file mode 100644
index 0000000..ad10721
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io.IOException
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient}
+import org.apache.kafka.common.Node
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.utils.{Time => JTime}
+
+object NetworkClientBlockingOps {
+  implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps
=
+    new NetworkClientBlockingOps(client)
+}
+
+/**
+ * Provides extension methods for `NetworkClient` that are useful for implementing blocking
behaviour. Use with care.
+ *
+ * Example usage:
+ *
+ * {{{
+ * val networkClient: NetworkClient = ...
+ * import NetworkClientBlockingOps._
+ * networkClient.blockingReady(...)
+ * }}}
+ */
+class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
+
+  /**
+   * Invokes `client.ready` followed by 0 or more `client.poll` invocations until the connection
to `node` is ready,
+   * the timeout expires or the connection fails.
+   *
+   * It returns `true` if the call completes normally or `false` if the timeout expires.
If the connection fails,
+   * an `IOException` is thrown instead.
+   *
+   * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
+   * care.
+   */
+  def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
+    client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
+      if (client.isReady(node, now))
+        true
+      else if (client.connectionFailed(node))
+        throw new IOException(s"Connection to $node failed")
+      else false
+    }
+  }
+
+  /**
+   * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response
is received,
+   * the timeout expires or a disconnection happens.
+   *
+   * It returns `true` if the call completes normally or `false` if the timeout expires.
In the case of a disconnection,
+   * an `IOException` is thrown instead.
+   *
+   * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
+   * care.
+   */
+  def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime):
Option[ClientResponse] = {
+    client.send(request)
+
+    pollUntilFound(timeout) { case (responses, _) =>
+      val response = responses.find { response =>
+        response.request.request.header.correlationId == request.request.header.correlationId
+      }
+      response.foreach { r =>
+        if (r.wasDisconnected) {
+          val destination = request.request.destination
+          throw new IOException(s"Connection to $destination was disconnected before the
response was read")
+        }
+      }
+      response
+    }
+
+  }
+
+  /**
+   * Invokes `client.poll` until `predicate` returns `true` or the timeout expires.
+   *
+   * It returns `true` if the call completes normally or `false` if the timeout expires.
Exceptions thrown via
+   * `predicate` are not handled and will bubble up.
+   *
+   * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
+   * care.
+   */
+  private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit
time: JTime): Boolean = {
+    pollUntilFound(timeout) { (responses, now) =>
+      if (predicate(responses, now)) Some(true)
+      else None
+    }.fold(false)(_ => true)
+  }
+
+  /**
+   * Invokes `client.poll` until `collect` returns `Some` or the timeout expires.
+   *
+   * It returns the result of `collect` if the call completes normally or `None` if the timeout
expires. Exceptions
+   * thrown via `collect` are not handled and will bubble up.
+   *
+   * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
+   * care.
+   */
+  private def pollUntilFound[T](timeout: Long)(collect: (Seq[ClientResponse], Long) =>
Option[T])(implicit time: JTime): Option[T] = {
+
+    val methodStartTime = time.milliseconds()
+    val timeoutExpiryTime = methodStartTime + timeout
+
+    @tailrec
+    def recurse(iterationStartTime: Long): Option[T] = {
+      val pollTimeout = if (timeout < 0) timeout else timeoutExpiryTime - iterationStartTime
+      val responses = client.poll(pollTimeout, iterationStartTime).asScala
+      val result = collect(responses, iterationStartTime)
+      if (result.isDefined) result
+      else {
+        val afterPollTime = time.milliseconds()
+        if (timeout < 0 || afterPollTime < timeoutExpiryTime)
+          recurse(afterPollTime)
+        else None
+      }
+    }
+
+    recurse(methodStartTime)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index d7112d4..985c64f 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -25,6 +25,9 @@ import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils._
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.{AbstractRequestResponse, AbstractRequest}
+import org.apache.kafka.common.utils.SystemTime
 import org.apache.log4j.{Level, Logger}
 import org.junit.{After, Before, Test}
 
@@ -146,9 +149,9 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging
{
   }
 }
 
-class MockChannelManager(private val controllerContext: ControllerContext,
-                           config: KafkaConfig)
-                           extends ControllerChannelManager(controllerContext, config) {
+class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig)
+  extends ControllerChannelManager(controllerContext, config, new SystemTime, new Metrics)
{
+
   def stopSendThread(brokerId: Int) {
     val requestThread = brokerStateInfo(brokerId).requestSendThread
     requestThread.isRunning.set(false)
@@ -157,12 +160,9 @@ class MockChannelManager(private val controllerContext: ControllerContext,
   }
 
   def shrinkBlockingQueue(brokerId: Int) {
-    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, RequestOrResponse =>
Unit)](1)
+    val messageQueue = new LinkedBlockingQueue[QueueItem](1)
     val brokerInfo = this.brokerStateInfo(brokerId)
-    this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel,
-                                                                      brokerInfo.broker,
-                                                                      messageQueue,
-                                                                      brokerInfo.requestSendThread))
+    this.brokerStateInfo.put(brokerId, brokerInfo.copy(messageQueue = messageQueue))
   }
 
   def resumeSendThread (brokerId: Int) {
@@ -176,4 +176,4 @@ class MockChannelManager(private val controllerContext: ControllerContext,
   def queueSize(brokerId: Int): Int = {
     this.brokerStateInfo(brokerId).messageQueue.size
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index bb12a50..ff17830 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,15 +17,22 @@
 
 package kafka.server
 
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
+
+import scala.collection.JavaConverters._
+import kafka.api.{PartitionStateInfo, LeaderAndIsr}
+import org.apache.kafka.common.requests.{LeaderAndIsrResponse, LeaderAndIsrRequest, AbstractRequestResponse}
 import org.junit.Assert._
-import kafka.api._
 import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
 import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrAndControllerEpoch}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.utils.SystemTime
 import org.junit.{Test, After, Before}
 
 class LeaderElectionTest extends ZooKeeperTestHarness {
@@ -124,21 +131,26 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
     val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId,
zkConnect))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
-    val brokerEndPoints = brokers.map(b => b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+    val brokerEndPoints = brokers.map { b =>
+      val brokerEndPoint = b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
+      new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+    }
 
     val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
-    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
+    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig,
new SystemTime, new Metrics)
     controllerChannelManager.startup()
     val staleControllerEpoch = 0
-    val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch]
-    leaderAndIsr.put((topic, partitionId),
-      new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)),
2))
-    val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokerEndPoints.toSet,
controllerId,
-                                                      staleControllerEpoch, 0, "")
-
-    controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
+    val partitionStates = Map(
+      new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
+        Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
+        Set(0, 1).map(Integer.valueOf).asJava)
+    )
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch,
partitionStates.asJava,
+      brokerEndPoints.toSet.asJava)
+
+    controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
+      staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
                             "Controller epoch should be stale")
     assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
@@ -146,7 +158,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     controllerChannelManager.shutdown()
   }
 
-  private def staleControllerEpochCallback(response: RequestOrResponse): Unit = {
+  private def staleControllerEpochCallback(response: AbstractRequestResponse): Unit = {
     val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
     staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
       case ErrorMapping.StaleControllerEpochCode => true


Mime
View raw message