kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-6065; Latency metric for KafkaZkClient
Date Wed, 06 Dec 2017 23:14:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d543e19a0 -> b00a9fc7c


KAFKA-6065; Latency metric for KafkaZkClient

Measures the latency of each request.

Updated existing `ZkUtils` test to use `KafkaZkClient`
instead.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #4265 from ijuma/kafka-6065-async-zk-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b00a9fc7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b00a9fc7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b00a9fc7

Branch: refs/heads/trunk
Commit: b00a9fc7c3b62506262cbd759c35d58ad99eb17a
Parents: d543e19
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Dec 7 01:13:40 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Dec 7 01:13:40 2017 +0200

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ConfigCommand.scala  |  7 +--
 .../main/scala/kafka/admin/TopicCommand.scala   |  7 +--
 .../security/auth/SimpleAclAuthorizer.scala     | 10 ++--
 .../main/scala/kafka/server/KafkaServer.scala   | 10 ++--
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  2 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 16 ++++++-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala | 49 +++++++++++++-------
 .../integration/kafka/api/MetricsTest.scala     |  2 +-
 .../controller/PartitionStateMachineTest.scala  | 14 +++---
 .../controller/ReplicaStateMachineTest.scala    |  4 +-
 .../unit/kafka/network/SocketServerTest.scala   |  1 -
 .../scala/unit/kafka/server/KafkaApisTest.scala |  3 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  8 ++--
 .../kafka/zookeeper/ZooKeeperClientTest.scala   | 12 +++--
 14 files changed, 90 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 077ecce..53aa2c1 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -30,7 +30,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
-import org.apache.kafka.common.utils.{Sanitizer, Utils}
+import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -63,8 +63,9 @@ object ConfigCommand extends Config {
 
     opts.checkArgs()
 
-    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000,
30000, Int.MaxValue)
-    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+    val time = Time.SYSTEM
+    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000,
30000, Int.MaxValue, time)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
     val adminZkClient = new AdminZkClient(zkClient)
 
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index bdd8aaf..dcf970a 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -31,7 +31,7 @@ import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.apache.kafka.common.TopicPartition
 
@@ -54,8 +54,9 @@ object TopicCommand extends Logging {
 
     opts.checkArgs()
 
-    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000,
30000, Int.MaxValue)
-    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+    val time = Time.SYSTEM
+    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000,
30000, Int.MaxValue, time)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
 
     var exitCode = 0
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index e1befc7..e10bfa1 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -29,7 +29,7 @@ import kafka.utils._
 import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient}
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
 import scala.util.Random
@@ -90,9 +90,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
     val zkMaxInFlightRequests = configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
 
-    val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, zkConnectionTimeoutMs,
zkMaxInFlightRequests)
-
-    zkClient = new KafkaZkClient(zooKeeperClient, kafkaConfig.zkEnableSecureAcls)
+    val time = Time.SYSTEM
+    val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, zkConnectionTimeoutMs,
zkMaxInFlightRequests, time)
+    zkClient = new KafkaZkClient(zooKeeperClient, kafkaConfig.zkEnableSecureAcls, time)
     zkClient.createAclPaths()
 
     loadCache()
@@ -322,4 +322,4 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d026018..234923a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -137,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   var quotaManagers: QuotaFactory.QuotaManagers = null
 
   var zkUtils: ZkUtils = null
-  private var zkClient: KafkaZkClient = null
+  private var _zkClient: KafkaZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new
File(logDir + File.separator + brokerMetaPropsFile)))).toMap
@@ -147,6 +147,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   def clusterId: String = _clusterId
 
+  // Visible for testing
+  private[kafka] def zkClient = _zkClient
+
   private[kafka] def brokerTopicStats = _brokerTopicStats
 
   newGauge(
@@ -221,8 +224,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-        val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests)
-        zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure)
+        val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
+          config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+        _zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure, time)
 
         /* start log manager */
         logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler,
time, brokerTopicStats, logDirFailureChannel)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 004a408..de2bc99 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -231,7 +231,7 @@ class ZooKeeperClientWrapper(val zkClient: ZkClient) {
 
 class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
     extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
-  val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+  private val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
     explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 55bfdfc..c035237 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -19,17 +19,19 @@ package kafka.zk
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 
+import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
+import kafka.metrics.KafkaMetricsGroup
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
-
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException}
@@ -47,7 +49,14 @@ import scala.collection.{Seq, mutable}
  * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration
is completed and tests are
  * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way
to go.
  */
-class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging
{
+class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends
Logging with KafkaMetricsGroup {
+
+  override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
+    explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
+  }
+
+  private val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+
   import KafkaZkClient._
 
   def createSequentialPersistentPath(path: String, data: String = ""): String = {
@@ -1093,6 +1102,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * Close the underlying ZooKeeperClient.
    */
   def close(): Unit = {
+    removeMetric("ZooKeeperRequestLatencyMs")
     zooKeeperClient.close()
   }
 
@@ -1243,6 +1253,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     while (remainingRequests.nonEmpty) {
       val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
 
+      batchResponses.foreach(response => latencyMetric.update(response.metadata.responseTimeMs))
+
       // Only execute slow path if we find a response with CONNECTIONLOSS
       if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) {
         val requestResponsePairs = remainingRequests.zip(batchResponses)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index a75181b..a0898da 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLat
 
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.Logging
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback,
StatCallback, StringCallback, VoidCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
@@ -42,7 +43,8 @@ import scala.collection.JavaConverters._
 class ZooKeeperClient(connectString: String,
                       sessionTimeoutMs: Int,
                       connectionTimeoutMs: Int,
-                      maxInFlightRequests: Int) extends Logging {
+                      maxInFlightRequests: Int,
+                      time: Time) extends Logging {
   this.logIdent = "[ZooKeeperClient] "
   private val initializationLock = new ReentrantReadWriteLock()
   private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -107,48 +109,51 @@ class ZooKeeperClient(connectString: String,
     // Safe to cast as we always create a response of the right type
     def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
 
+    def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs
= time.hiResClockMs())
+
+    val sendTimeMs = time.hiResClockMs()
     request match {
       case ExistsRequest(path, ctx) =>
         zooKeeper.exists(path, shouldWatch(request), new StatCallback {
           override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat))
+            callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case GetDataRequest(path, ctx) =>
         zooKeeper.getData(path, shouldWatch(request), new DataCallback {
           override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte],
stat: Stat): Unit =
-            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat))
+            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case GetChildrenRequest(path, ctx) =>
         zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
           override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String],
stat: Stat): Unit =
             callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
-              Option(children).map(_.asScala).getOrElse(Seq.empty), stat))
+              Option(children).map(_.asScala).getOrElse(Seq.empty), stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case CreateRequest(path, data, acl, createMode, ctx) =>
         zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
           override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit
=
-            callback(CreateResponse(Code.get(rc), path, Option(ctx), name))
+            callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case SetDataRequest(path, data, version, ctx) =>
         zooKeeper.setData(path, data, version, new StatCallback {
           override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat))
+            callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case DeleteRequest(path, version, ctx) =>
         zooKeeper.delete(path, version, new VoidCallback {
           override def processResult(rc: Int, path: String, ctx: Any): Unit =
-            callback(DeleteResponse(Code.get(rc), path, Option(ctx)))
+            callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case GetAclRequest(path, ctx) =>
         zooKeeper.getACL(path, null, new ACLCallback {
           override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL],
stat: Stat): Unit = {
             callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
-              stat))
+              stat, responseMetadata(sendTimeMs)))
         }}, ctx.orNull)
       case SetAclRequest(path, acl, version, ctx) =>
         zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
           override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat))
+            callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
     }
   }
@@ -404,15 +409,25 @@ sealed abstract class AsyncResponse {
     if (resultCode != Code.OK)
       throw KeeperException.create(resultCode, path)
   }
+
+  def metadata: ResponseMetadata
 }
-case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String)
extends AsyncResponse
-case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse
-case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends
AsyncResponse
-case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte],
stat: Stat) extends AsyncResponse
-case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat)
extends AsyncResponse
-case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL],
stat: Stat) extends AsyncResponse
-case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends
AsyncResponse
-case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children:
Seq[String], stat: Stat) extends AsyncResponse
+
+case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) {
+  def responseTimeMs: Long = sendTimeMs - receivedTimeMs
+}
+
+case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String,
metadata: ResponseMetadata) extends AsyncResponse
+case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any], metadata: ResponseMetadata)
extends AsyncResponse
+case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata:
ResponseMetadata) extends AsyncResponse
+case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte],
stat: Stat,
+                           metadata: ResponseMetadata) extends AsyncResponse
+case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat,
metadata: ResponseMetadata) extends AsyncResponse
+case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL],
stat: Stat,
+                          metadata: ResponseMetadata) extends AsyncResponse
+case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata:
ResponseMetadata) extends AsyncResponse
+case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children:
Seq[String], stat: Stat,
+                               metadata: ResponseMetadata) extends AsyncResponse
 
 class ZooKeeperClientException(message: String) extends RuntimeException(message)
 class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 26022be..4596887 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -207,7 +207,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
   private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
     // Latency is rounded to milliseconds, so we may need to retry some operations to get
latency > 0.
     val (_, recorded) = TestUtils.computeUntilTrue({
-      servers.head.zkUtils.getLeaderAndIsrForPartition(topic, 0)
+      servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
       yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
     })(latency => latency > 0.0)
     assertTrue("ZooKeeper latency not recorded", recorded)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 2e56c33..32e0d43 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -22,7 +22,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper.{CreateResponse, GetDataResponse, ZooKeeperClientException}
+import kafka.zookeeper.{CreateResponse, GetDataResponse, ResponseMetadata, ZooKeeperClientException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
@@ -85,7 +85,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
-      .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
+      .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, ResponseMetadata(0,
0))))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -119,7 +119,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
-      .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null)))
+      .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null, ResponseMetadata(0,
0))))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
@@ -154,7 +154,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
-        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0,
0))))
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
@@ -185,7 +185,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
-        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0,
0))))
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
@@ -236,7 +236,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
-        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0,
0))))
 
     EasyMock.expect(mockZkClient.getLogConfigs(Seq.empty, config.originals()))
       .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
@@ -288,7 +288,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition),
-        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0,
0))))
 
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 5d24d79..4d38aac 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -21,7 +21,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
-import kafka.zookeeper.GetDataResponse
+import kafka.zookeeper.{GetDataResponse, ResponseMetadata}
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
@@ -182,7 +182,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch)
     EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)).andReturn(
       Seq(GetDataResponse(Code.OK, null, Some(partition),
-        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0,
0))))
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr),
controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty,
Map.empty))
     EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9bc7437..f5c9f03 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -43,7 +43,6 @@ import org.apache.log4j.Level
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
-import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a51acd0..fd6073d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -30,7 +30,7 @@ import kafka.log.{Log, TimestampOffset}
 import kafka.network.RequestChannel
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.{MockTime, TestUtils, ZkUtils}
+import kafka.utils.{MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException
@@ -60,7 +60,6 @@ class KafkaApisTest {
   private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
   private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
   private val controller = EasyMock.createNiceMock(classOf[KafkaController])
-  private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
   private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
   private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
   private val metrics = new Metrics()

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 4966b10..a437810 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
 import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.utils.Time
 
 @Category(Array(classOf[IntegrationTest]))
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@@ -56,10 +57,11 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
 
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
zkMaxInFlightRequests)
-    zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
+    val time = Time.SYSTEM
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
zkMaxInFlightRequests, time)
+    zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled),
time)
     adminZkClient = new AdminZkClient(zkClient)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b00a9fc7/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 643d502..75842f0 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -26,12 +26,14 @@ import javax.security.auth.login.Configuration
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.{CreateMode, ZooDefs}
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
 import org.junit.{After, Test}
 
 class ZooKeeperClientTest extends ZooKeeperTestHarness {
   private val mockPath = "/foo"
+  private val time = Time.SYSTEM
 
   @After
   override def tearDown() {
@@ -42,18 +44,18 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test(expected = classOf[UnknownHostException])
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue)
+    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, time)
   }
 
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue,
time)
   }
 
   @Test
   def testConnection(): Unit = {
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time)
   }
 
   @Test
@@ -318,7 +320,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
Int.MaxValue)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
Int.MaxValue, time)
     zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
     zooKeeperClient.reinitialize()
 
@@ -328,7 +330,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testConnectionLossRequestTermination(): Unit = {
     val batchSize = 10
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
2)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
2, time)
     zookeeper.shutdown()
     val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
     val countDownLatch = new CountDownLatch(1)


Mime
View raw message