kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-2440; Use `NetworkClient` instead of `SimpleConsumer` to fetch data from replica
Date Fri, 11 Sep 2015 23:08:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6c1957d00 -> 65bf3afe8


http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
new file mode 100644
index 0000000..20a4068
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.integration
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import kafka.admin.AdminUtils
+import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
+import kafka.client.ClientUtils
+import kafka.cluster.{Broker, BrokerEndPoint}
+import kafka.common.ErrorMapping
+import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Assert._
+import org.junit.{Test, After, Before}
+
+abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
+  private var server1: KafkaServer = null
+  var brokerEndPoints: Seq[BrokerEndPoint] = null
+  var adHocConfigs: Seq[KafkaConfig] = null
+  val numConfigs: Int = 4
+
+  /* If this is `Some`, SSL will be enabled */
+  protected def trustStoreFile: Option[File]
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val props = createBrokerConfigs(numConfigs, zkConnect, enableSSL = trustStoreFile.isDefined,
trustStoreFile = trustStoreFile)
+    val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
+    adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual
test cases
+    server1 = TestUtils.createServer(configs.head)
+    brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+  }
+
+  @After
+  override def tearDown() {
+    server1.shutdown()
+    super.tearDown()
+  }
+
+  @Test
+  def testTopicMetadataRequest {
+    // create topic
+    val topic = "test"
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
+
+    // create a topic metadata request
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
+
+    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes
+ 2)
+    topicMetadataRequest.writeTo(serializedMetadataRequest)
+    serializedMetadataRequest.rewind()
+    val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest)
+
+    assertEquals(topicMetadataRequest, deserializedMetadataRequest)
+  }
+
+  @Test
+  def testBasicTopicMetadata {
+    // create topic
+    val topic = "test"
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
+    val partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+  }
+
+  @Test
+  def testGetAllTopicMetadata {
+    // create topic
+    val topic1 = "testGetAllTopicMetadata1"
+    val topic2 = "testGetAllTopicMetadata2"
+    createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+    createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+
+    // issue metadata request with empty list of topics
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(2, topicsMetadata.size)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.last.partitionsMetadata.head.errorCode)
+    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
+    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
+    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
+    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
+  }
+
+  @Test
+  def testAutoCreateTopic {
+    // auto create topic
+    val topic = "testAutoCreateTopic"
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
+
+    // retry the metadata for the auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    val partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
+  }
+
+  @Test
+  def testAutoCreateTopicWithCollision {
+    // auto create topic
+    val topic1 = "testAutoCreate_Topic"
+    val topic2 = "testAutoCreate.Topic"
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints,
"TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
+    assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
+    assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode,
topicsMetadata(1).errorCode)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
+
+    // retry the metadata for the first auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
+  }
+
+  private def checkIsr(servers: Seq[KafkaServer]): Unit = {
+    val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState
!= NotRunning.state)
+    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(
+      x => new BrokerEndPoint(x.config.brokerId,
+                              if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+                              x.boundPort())
+    )
+
+    // Assert that topic metadata at new brokers is updated correctly
+    activeBrokers.foreach(x => {
+      var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+      waitUntilTrue(() => {
+        metadata = ClientUtils.fetchTopicMetadata(
+                                Set.empty,
+                                Seq(new BrokerEndPoint(
+                                                  x.config.brokerId,
+                                                  if (x.config.hostName.nonEmpty) x.config.hostName
else "localhost",
+                                                  x.boundPort())),
+                                "TopicMetadataTest-testBasicTopicMetadata",
+                                2000, 0)
+        metadata.topicsMetadata.nonEmpty &&
+          metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
+          expectedIsr == metadata.topicsMetadata.head.partitionsMetadata.head.isr
+      },
+        "Topic metadata is not correctly updated for broker " + x + ".\n" +
+        "Expected ISR: " + expectedIsr + "\n" +
+        "Actual ISR  : " + (if (metadata.topicsMetadata.nonEmpty &&
+                                metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
+                              metadata.topicsMetadata.head.partitionsMetadata.head.isr
+                            else
+                              ""), 6000L)
+    })
+  }
+
+  @Test
+  def testIsrAfterBrokerShutDownAndJoinsBack {
+    val numBrokers = 2 //just 2 brokers are enough for the test
+
+    // start adHoc brokers
+    val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
+    val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
+
+    // create topic
+    val topic: String = "test"
+    AdminUtils.createTopic(zkClient, topic, 1, numBrokers)
+
+    // shutdown a broker
+    adHocServers.last.shutdown()
+    adHocServers.last.awaitShutdown()
+
+    // startup a broker
+    adHocServers.last.startup()
+
+    // check metadata is still correct and updated at all brokers
+    checkIsr(allServers)
+
+    // shutdown adHoc brokers
+    adHocServers.map(p => p.shutdown())
+  }
+
+  private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit =
{
+    var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+
+    // Get topic metadata from old broker
+    // Wait for metadata to get updated by checking metadata from a new broker
+    waitUntilTrue(() => {
+    topicMetadata = ClientUtils.fetchTopicMetadata(
+      Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+    topicMetadata.brokers.size == expectedBrokersCount},
+      "Alive brokers list is not correctly propagated by coordinator to brokers"
+    )
+
+    // Assert that topic metadata at new brokers is updated correctly
+    servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
+      waitUntilTrue(() =>
+        topicMetadata == ClientUtils.fetchTopicMetadata(
+          Set.empty,
+          Seq(new Broker(x.config.brokerId,
+            x.config.hostName,
+            x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
+          "TopicMetadataTest-testBasicTopicMetadata",
+          2000, 0), "Topic metadata is not correctly updated"))
+  }
+
+
+  @Test
+  def testAliveBrokerListWithNoTopics {
+    checkMetadata(Seq(server1), 1)
+  }
+
+  @Test
+  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
+    var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
+
+    checkMetadata(adHocServers, numConfigs - 1)
+
+    // Add a broker
+    adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
+
+    checkMetadata(adHocServers, numConfigs)
+    adHocServers.map(p => p.shutdown())
+  }
+
+
+  @Test
+  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
+    val adHocServers = adHocConfigs.map(p => createServer(p))
+
+    checkMetadata(adHocServers, numConfigs)
+
+    // Shutdown a broker
+    adHocServers.last.shutdown()
+    adHocServers.last.awaitShutdown()
+
+    checkMetadata(adHocServers, numConfigs - 1)
+
+    adHocServers.map(p => p.shutdown())
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
new file mode 100644
index 0000000..176d251
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+class PlaintextTopicMetadataTest extends BaseTopicMetadataTest {
+  protected def trustStoreFile = None
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
new file mode 100644
index 0000000..5ff9f35
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import java.io.File
+
+class SslTopicMetadataTest extends BaseTopicMetadataTest {
+  protected def trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
deleted file mode 100644
index 5e32d59..0000000
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.integration
-
-import java.nio.ByteBuffer
-
-import kafka.admin.AdminUtils
-import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
-import kafka.client.ClientUtils
-import kafka.cluster.{Broker, BrokerEndPoint}
-import kafka.common.ErrorMapping
-import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.Assert._
-import org.junit.{Test, After, Before}
-
-class TopicMetadataTest extends ZooKeeperTestHarness {
-  private var server1: KafkaServer = null
-  var brokerEndPoints: Seq[BrokerEndPoint] = null
-  var adHocConfigs: Seq[KafkaConfig] = null
-  val numConfigs: Int = 4
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val props = createBrokerConfigs(numConfigs, zkConnect)
-    val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
-    adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual
test cases
-    server1 = TestUtils.createServer(configs.head)
-    brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
-  }
-
-  @After
-  override def tearDown() {
-    server1.shutdown()
-    super.tearDown()
-  }
-
-  @Test
-  def testTopicMetadataRequest {
-    // create topic
-    val topic = "test"
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
-
-    // create a topic metadata request
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-
-    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes
+ 2)
-    topicMetadataRequest.writeTo(serializedMetadataRequest)
-    serializedMetadataRequest.rewind()
-    val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest)
-
-    assertEquals(topicMetadataRequest, deserializedMetadataRequest)
-  }
-
-  @Test
-  def testBasicTopicMetadata {
-    // create topic
-    val topic = "test"
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
-    var partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-  }
-
-  @Test
-  def testGetAllTopicMetadata {
-    // create topic
-    val topic1 = "testGetAllTopicMetadata1"
-    val topic2 = "testGetAllTopicMetadata2"
-    createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-    createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
-    // issue metadata request with empty list of topics
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(2, topicsMetadata.size)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.last.partitionsMetadata.head.errorCode)
-    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
-    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
-    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
-    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
-  }
-
-  @Test
-  def testAutoCreateTopic {
-    // auto create topic
-    val topic = "testAutoCreateTopic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
-    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
-
-    // retry the metadata for the auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    var partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
-
-  @Test
-  def testAutoCreateTopicWithCollision {
-    // auto create topic
-    val topic1 = "testAutoCreate_Topic"
-    val topic2 = "testAutoCreate.Topic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints,
"TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
-    assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
-    assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
-    assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
-    assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode,
topicsMetadata(1).errorCode)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
-
-    // retry the metadata for the first auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    var partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
-
-  private def checkIsr(servers: Seq[KafkaServer]): Unit = {
-    val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState
!= NotRunning.state)
-    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(
-      x => new BrokerEndPoint(x.config.brokerId,
-                              if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
-                              x.boundPort())
-    )
-
-    // Assert that topic metadata at new brokers is updated correctly
-    activeBrokers.foreach(x => {
-      var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-      waitUntilTrue(() => {
-        metadata = ClientUtils.fetchTopicMetadata(
-                                Set.empty,
-                                Seq(new BrokerEndPoint(
-                                                  x.config.brokerId,
-                                                  if (x.config.hostName.nonEmpty) x.config.hostName
else "localhost",
-                                                  x.boundPort())),
-                                "TopicMetadataTest-testBasicTopicMetadata",
-                                2000, 0)
-        metadata.topicsMetadata.nonEmpty &&
-          metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
-          expectedIsr == metadata.topicsMetadata.head.partitionsMetadata.head.isr
-      },
-        "Topic metadata is not correctly updated for broker " + x + ".\n" +
-        "Expected ISR: " + expectedIsr + "\n" +
-        "Actual ISR  : " + (if (metadata.topicsMetadata.nonEmpty &&
-                                metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
-                              metadata.topicsMetadata.head.partitionsMetadata.head.isr
-                            else
-                              ""), 6000L)
-    })
-  }
-
-  @Test
-  def testIsrAfterBrokerShutDownAndJoinsBack {
-    val numBrokers = 2 //just 2 brokers are enough for the test
-
-    // start adHoc brokers
-    val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
-    val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
-
-    // create topic
-    val topic: String = "test"
-    AdminUtils.createTopic(zkClient, topic, 1, numBrokers)
-
-    // shutdown a broker
-    adHocServers.last.shutdown()
-    adHocServers.last.awaitShutdown()
-
-    // startup a broker
-    adHocServers.last.startup()
-
-    // check metadata is still correct and updated at all brokers
-    checkIsr(allServers)
-
-    // shutdown adHoc brokers
-    adHocServers.map(p => p.shutdown())
-  }
-
-  private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit =
{
-    var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-
-    // Get topic metadata from old broker
-    // Wait for metadata to get updated by checking metadata from a new broker
-    waitUntilTrue(() => {
-    topicMetadata = ClientUtils.fetchTopicMetadata(
-      Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-    topicMetadata.brokers.size == expectedBrokersCount},
-      "Alive brokers list is not correctly propagated by coordinator to brokers"
-    )
-
-    // Assert that topic metadata at new brokers is updated correctly
-    servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
-      waitUntilTrue(() =>
-        topicMetadata == ClientUtils.fetchTopicMetadata(
-          Set.empty,
-          Seq(new Broker(x.config.brokerId,
-            x.config.hostName,
-            x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
-          "TopicMetadataTest-testBasicTopicMetadata",
-          2000, 0), "Topic metadata is not correctly updated"))
-  }
-
-
-  @Test
-  def testAliveBrokerListWithNoTopics {
-    checkMetadata(Seq(server1), 1)
-  }
-
-  @Test
-  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
-    var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
-
-    checkMetadata(adHocServers, numConfigs - 1)
-
-    // Add a broker
-    adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
-
-    checkMetadata(adHocServers, numConfigs)
-    adHocServers.map(p => p.shutdown())
-  }
-
-
-  @Test
-  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
-    val adHocServers = adHocConfigs.map(p => createServer(p))
-
-    checkMetadata(adHocServers, numConfigs)
-
-    // Shutdown a broker
-    adHocServers.last.shutdown()
-    adHocServers.last.awaitShutdown()
-
-    checkMetadata(adHocServers, numConfigs - 1)
-
-    adHocServers.map(p => p.shutdown())
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
new file mode 100644
index 0000000..b744b94
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import java.io.File
+
+import org.junit.{Test, After, Before}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import kafka.producer.KeyedMessage
+import kafka.serializer.StringEncoder
+import kafka.utils.{TestUtils}
+import kafka.common._
+
+abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
+  var brokers: Seq[KafkaServer] = null
+  val topic1 = "foo"
+  val topic2 = "bar"
+
+  /* If this is `Some`, SSL will be enabled */
+  protected def trustStoreFile: Option[File]
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    brokers = createBrokerConfigs(2, zkConnect, enableControlledShutdown = false, enableSSL
= trustStoreFile.isDefined, trustStoreFile = trustStoreFile)
+      .map(KafkaConfig.fromProps)
+      .map(TestUtils.createServer(_))
+  }
+
+  @After
+  override def tearDown() {
+    brokers.foreach(_.shutdown())
+    super.tearDown()
+  }
+
+  @Test
+  def testReplicaFetcherThread() {
+    val partition = 0
+    val testMessageList1 = List("test1", "test2", "test3", "test4")
+    val testMessageList2 = List("test5", "test6", "test7", "test8")
+
+    // create a topic and partition and await leadership
+    for (topic <- List(topic1,topic2)) {
+      createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
+    }
+
+    // send test messages to leader
+    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers),
+                                                            encoder = classOf[StringEncoder].getName,
+                                                            keyEncoder = classOf[StringEncoder].getName)
+    val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m
=> new KeyedMessage(topic2, m, m))
+    producer.send(messages:_*)
+    producer.close()
+
+    def logsMatch(): Boolean = {
+      var result = true
+      for (topic <- List(topic1, topic2)) {
+        val topicAndPart = TopicAndPartition(topic, partition)
+        val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
+        result = result && expectedOffset > 0 && brokers.forall { item
=>
+          (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset)
+        }
+      }
+      result
+    }
+    waitUntilTrue(logsMatch, "Broker logs should be identical")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 0c6d23d..bab81df 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 import kafka.log._
 import java.io.File
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.metrics.Metrics
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
@@ -26,6 +27,7 @@ import kafka.common._
 import kafka.cluster.Replica
 import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
 import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
 class HighwatermarkPersistenceTest {
 
@@ -53,7 +55,8 @@ class HighwatermarkPersistenceTest {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler,
logManagers(0), new AtomicBoolean(false))
+    val replicaManager = new ReplicaManager(configs.head, new Metrics, new MockTime, new
JMockTime, zkClient, scheduler,
+      logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
     var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
@@ -90,7 +93,8 @@ class HighwatermarkPersistenceTest {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler,
logManagers(0), new AtomicBoolean(false))
+    val replicaManager = new ReplicaManager(configs.head, new Metrics, new MockTime(), new
JMockTime, zkClient,
+      scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
     var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 977b29a..26910a8 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import java.util.Properties
 
+import org.apache.kafka.common.metrics.Metrics
 import org.junit.{Test, Before, After}
 import collection.mutable.HashMap
 import collection.mutable.Map
@@ -28,6 +29,7 @@ import org.junit.Assert._
 import kafka.utils._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.message.MessageSet
+import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
 
 class IsrExpirationTest {
@@ -43,12 +45,14 @@ class IsrExpirationTest {
   val topic = "foo"
 
   val time = new MockTime
+  val jTime = new JMockTime
+  val metrics = new Metrics
 
   var replicaManager: ReplicaManager = null
 
   @Before
   def setUp() {
-    replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null,
new AtomicBoolean(false))
   }
 
   @After
@@ -179,4 +183,4 @@ class IsrExpirationTest {
       new Replica(config.brokerId, partition, time)
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
new file mode 100644
index 0000000..871e49b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+class PlaintextReplicaFetchTest extends BaseReplicaFetchTest {
+  protected def trustStoreFile = None
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
deleted file mode 100644
index e40bf3b..0000000
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.server
-
-import org.junit.{Test, After, Before}
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.TestUtils._
-import kafka.producer.KeyedMessage
-import kafka.serializer.StringEncoder
-import kafka.utils.{TestUtils}
-import kafka.common._
-
-class ReplicaFetchTest extends ZooKeeperTestHarness  {
-  var brokers: Seq[KafkaServer] = null
-  val topic1 = "foo"
-  val topic2 = "bar"
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    brokers = createBrokerConfigs(2, zkConnect, false)
-      .map(KafkaConfig.fromProps)
-      .map(config => TestUtils.createServer(config))
-  }
-
-  @After
-  override def tearDown() {
-    brokers.foreach(_.shutdown())
-    super.tearDown()
-  }
-
-  @Test
-  def testReplicaFetcherThread() {
-    val partition = 0
-    val testMessageList1 = List("test1", "test2", "test3", "test4")
-    val testMessageList2 = List("test5", "test6", "test7", "test8")
-
-    // create a topic and partition and await leadership
-    for (topic <- List(topic1,topic2)) {
-      createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
-    }
-
-    // send test messages to leader
-    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers),
-                                                            encoder = classOf[StringEncoder].getName,
-                                                            keyEncoder = classOf[StringEncoder].getName)
-    val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m
=> new KeyedMessage(topic2, m, m))
-    producer.send(messages:_*)
-    producer.close()
-
-    def logsMatch(): Boolean = {
-      var result = true
-      for (topic <- List(topic1, topic2)) {
-        val topicAndPart = TopicAndPartition(topic, partition)
-        val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
-        result = result && expectedOffset > 0 && brokers.forall { item
=>
-          (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset)
-        }
-      }
-      result
-    }
-    waitUntilTrue(logsMatch, "Broker logs should be identical")
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 3770cb4..f260a71 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,7 +24,9 @@ import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
 
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.junit.Test
@@ -42,7 +44,9 @@ class ReplicaManagerTest {
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
-    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val jTime = new JMockTime
+    val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time),
mockLogMgr,
+      new AtomicBoolean(false))
     val partition = rm.getOrCreatePartition(topic, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
@@ -59,7 +63,9 @@ class ReplicaManagerTest {
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
-    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val jTime = new JMockTime
+    val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time),
mockLogMgr,
+      new AtomicBoolean(false))
     val partition = rm.getOrCreatePartition(topic, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
@@ -75,7 +81,9 @@ class ReplicaManagerTest {
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
-    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val jTime = new JMockTime
+    val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time),
mockLogMgr,
+      new AtomicBoolean(false))
     val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
     def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
       assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ba584a2..884ec06 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -22,6 +22,8 @@ import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.junit.{Test, After, Before}
 
 import java.util.{Properties, Collections}
@@ -46,6 +48,8 @@ class SimpleFetchTest {
 
   // set the replica manager with the partition
   val time = new MockTime
+  val jTime = new JMockTime
+  val metrics = new Metrics
   val leaderLEO = 20L
   val followerLEO = 15L
   val partitionHW = 5
@@ -94,7 +98,8 @@ class SimpleFetchTest {
     EasyMock.replay(logManager)
 
     // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, logManager,
new AtomicBoolean(false))
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkClient, scheduler,
logManager,
+      new AtomicBoolean(false))
 
     // add the partition with two replicas, both in ISR
     val partition = replicaManager.getOrCreatePartition(topic, partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
new file mode 100644
index 0000000..9858052
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+
+class SslReplicaFetchTest extends BaseReplicaFetchTest {
+  protected def trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+}


Mime
View raw message