kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Add unit tests to the ReassignPartitionsCommand
Date Fri, 06 Jan 2017 11:59:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e1bcf661 -> b7da847f6


MINOR: Add unit tests to the ReassignPartitionsCommand

Adds a bunch of tests to unit tests to the assignment command.
Moves the Rack aware test into its own class as it makes use of ZooKeeperTestHarness and slows
everything else down.

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1950 from benstopford/os-rebalance-extra-unit-testing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7da847f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7da847f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7da847f

Branch: refs/heads/trunk
Commit: b7da847f6ae5786925d65a673b7e80f2de8fe389
Parents: 2e1bcf6
Author: Ben Stopford <benstopford@gmail.com>
Authored: Fri Jan 6 11:03:23 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jan 6 11:23:03 2017 +0000

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  16 +-
 .../ReassignPartitionsIntegrationTest.scala     |  46 ++++
 .../admin/ReassignPartitionsCommandTest.scala   | 227 ++++++++++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala |   6 +
 4 files changed, 255 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7da847f/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4fcd548..dc707e5 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -74,7 +74,7 @@ object ReassignPartitionsCommand extends Logging {
     removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus)
   }
 
-  private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit =
{
+  private[admin] def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus], admin:
AdminUtilities = AdminUtils): Unit = {
     var changed = false
 
     //If all partitions have completed remove the throttle
@@ -82,11 +82,11 @@ object ReassignPartitionsCommand extends Logging {
       //Remove the throttle limit from all brokers in the cluster
       //(as we no longer know which specific brokers were involved in the move)
       for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
-        val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
+        val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
         // bitwise OR as we don't want to short-circuit
         if (configs.remove(DynamicConfig.Broker.LeaderReplicationThrottledRateProp) != null
           | configs.remove(DynamicConfig.Broker.FollowerReplicationThrottledRateProp) !=
null){
-          AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
+          admin.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
           changed = true
         }
       }
@@ -94,11 +94,11 @@ object ReassignPartitionsCommand extends Logging {
       //Remove the list of throttled replicas from all topics with partitions being moved
       val topics = partitionsToBeReassigned.keySet.map(tp => tp.topic).toSeq.distinct
       for (topic <- topics) {
-        val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+        val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
         // bitwise OR as we don't want to short-circuit
         if (configs.remove(LogConfig.LeaderReplicationThrottledReplicasProp) != null
           | configs.remove(LogConfig.FollowerReplicationThrottledReplicasProp) != null){
-          AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+          admin.changeTopicConfig(zkUtils, topic, configs)
           changed = true
         }
       }
@@ -303,7 +303,7 @@ object ReassignPartitionsCommand extends Logging {
   }
 }
 
-class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition,
Seq[Int]])
+class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition,
Seq[Int]], admin: AdminUtilities = AdminUtils)
   extends Logging {
 
   def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
@@ -329,10 +329,10 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
       val brokers = (existingBrokers ++ proposedBrokers).distinct
 
       for (id <- brokers) {
-        val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
+        val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
         configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString)
         configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
-        AdminUtils.changeBrokerConfig(zkUtils, Seq(id), configs)
+        admin.changeBrokerConfig(zkUtils, Seq(id), configs)
       }
       println(s"The throttle limit was set to $throttle B/s")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7da847f/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
new file mode 100644
index 0000000..47d487e
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.admin
+
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest {
+
+  @Test
+  def testRackAwareReassign() {
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
+    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+    val numPartitions = 18
+    val replicationFactor = 3
+
+    // create a non rack aware assignment topic first
+    val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array(
+      "--partitions", numPartitions.toString,
+      "--replication-factor", replicationFactor.toString,
+      "--disable-rack-aware",
+      "--topic", "foo"))
+    kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+
+    val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
+    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+      rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
+
+    val assignment = proposedAssignment map { case (topicPartition, replicas) =>
+      (topicPartition.partition, replicas)
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7da847f/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 90a354e..f2a2362 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -18,43 +18,26 @@ package kafka.admin
 
 import java.util.Properties
 
+import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
+import kafka.log.LogConfig
 import kafka.log.LogConfig._
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.{ConfigType, DynamicConfig}
+import kafka.utils.CoreUtils._
+import kafka.utils.TestUtils._
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.easymock.EasyMock._
+import org.easymock.{Capture, CaptureType, EasyMock}
 import org.junit.{Before, Test}
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, assertNull, fail}
+import scala.collection.{Seq, mutable}
+import scala.collection.JavaConversions._
 
-class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
+class ReassignPartitionsCommandTest extends Logging {
   var calls = 0
 
   @Test
-  def testRackAwareReassign() {
-    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
-    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
-
-    val numPartitions = 18
-    val replicationFactor = 3
-
-    // create a non rack aware assignment topic first
-    val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array(
-      "--partitions", numPartitions.toString,
-      "--replication-factor", replicationFactor.toString,
-      "--disable-rack-aware",
-      "--topic", "foo"))
-    kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
-
-    val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
-    val (proposedAssignment, _) = ReassignPartitionsCommand.generateAssignment(zkUtils,
-      rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
-
-    val assignment = proposedAssignment map { case (topicPartition, replicas) =>
-      (topicPartition.partition, replicas)
-    }
-    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
-  }
-
-  @Test
   def shouldFindMovingReplicas() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
     val assigner = new ReassignPartitionsCommand(null, null)
@@ -157,7 +140,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
           case "topic2" =>
             assertEquals("0:100,1:100", configChange.get(FollowerReplicationThrottledReplicasProp))
             assertEquals("0:101,0:102,1:101,1:102", configChange.get(LeaderReplicationThrottledReplicasProp))
-          case _ => fail()
+          case _ => fail(s"Unexpected topic $topic")
         }
         calls += 1
       }
@@ -199,7 +182,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
     val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), control)
 
     //Given partition there are existing properties
-    val existingProperties = CoreUtils.propsWith("some-key", "some-value")
+    val existingProperties = propsWith("some-key", "some-value")
 
     //Then the dummy property should still be there
     val mock = new TestAdminUtils {
@@ -218,8 +201,188 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
     assertEquals(1, calls)
   }
 
+  @Test
+  def shouldSetQuotaLimit(): Unit = {
+    //Given
+    val existing = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
+    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(101, 102))
+
+    //Setup
+    val zk = stubZK(existing)
+    val admin = createMock(classOf[AdminUtilities])
+    val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
+    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    expect(admin.fetchEntityConfig(is(zk), anyString(), anyString())).andStubReturn(new Properties)
+    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
+    replay(admin)
+
+    //When
+    assigner.maybeLimit(1000)
+
+    //Then
+    for (actual <- propsCapture.getValues) {
+      assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
+      assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
+    }
+    assertEquals(3, propsCapture.getValues.size) //3 brokers
+  }
+
+  @Test
+  def shouldUpdateQuotaLimit(): Unit = {
+    //Given
+    val existing = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
+    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(101, 102))
+
+    //Setup
+    val zk = stubZK(existing)
+    val admin = createMock(classOf[AdminUtilities])
+    val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
+    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
+
+    //Expect the existing broker config to be changed from 10/100 to 1000
+    val existingConfigs = CoreUtils.propsWith(
+      (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "10"),
+      (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "100")
+    )
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("100"))).andReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("101"))).andReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("102"))).andReturn(copyOf(existingConfigs))
+    replay(admin)
+
+    //When
+    assigner.maybeLimit(1000)
+
+    //Then
+    for (actual <- propsCapture.getValues) {
+      assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
+      assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
+    }
+    assertEquals(3, propsCapture.getValues.size) //three brokers
+  }
+
+  @Test
+  def shouldNotOverwriteExistingPropertiesWhenLimitIsAdded(): Unit = {
+    //Given
+    val existing = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
+    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(101, 102))
+
+    //Setup
+    val zk = stubZK(existing)
+    val admin = createMock(classOf[AdminUtilities])
+    val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
+    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
+
+    //Given there is some existing config
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), anyString())).andReturn(
+      propsWith("useful.key", "useful.value")).atLeastOnce()
+
+    replay(admin)
+
+    //When
+    assigner.maybeLimit(1000)
+
+    //Then other property remains
+    for (actual <- propsCapture.getValues) {
+      assertEquals("useful.value", actual.getProperty("useful.key"))
+      assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
+      assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
+    }
+    assertEquals(3, propsCapture.getValues.size) //3 brokers
+  }
+
+  @Test
+  def shouldRemoveThrottleLimitFromAllBrokers(): Unit = {
+    //Given 3 brokers, but with assignment only covering 2 of them
+    val brokers = Seq(100, 101, 102)
+    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
+    val status = mutable.Map(TopicAndPartition("topic1", 0) -> ReassignmentCompleted)
+    val existingBrokerConfigs = propsWith(
+      (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "10"),
+      (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "100"),
+      ("useful.key", "value")
+    )
+
+    //Setup
+    val zk = stubZK(brokers = brokers)
+    val admin = createMock(classOf[AdminUtilities])
+    val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Topic), anyString())).andStubReturn(new
Properties)
+    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
+    //Stub each invocation as EasyMock caches the return value which can be mutated
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("100"))).andReturn(copyOf(existingBrokerConfigs))
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("101"))).andReturn(copyOf(existingBrokerConfigs))
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("102"))).andReturn(copyOf(existingBrokerConfigs))
+    replay(admin)
+
+    //When
+    ReassignPartitionsCommand.removeThrottle(zk, proposed, status, admin)
+
+    //Then props should have gone (dummy remains)
+    for (capture <- propsCapture.getValues) {
+      assertEquals("value", capture.get("useful.key"))
+      assertNull(capture.get(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
+      assertNull(capture.get(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
+    }
+    assertEquals(3, propsCapture.getValues.size) //3 brokers
+  }
+
+  @Test
+  def shouldRemoveThrottleReplicaListBasedOnProposedAssignment(): Unit = {
+
+    //Given two topics with existing config
+    val proposed = mutable.Map(
+      TopicAndPartition("topic1", 0) -> Seq(100, 101),
+      TopicAndPartition("topic2", 0) -> Seq(100, 101)
+    )
+    val status = mutable.Map(TopicAndPartition("topic1", 0) -> ReassignmentCompleted)
+    val existingConfigs = CoreUtils.propsWith(
+      (LogConfig.LeaderReplicationThrottledReplicasProp, "1:100:2:100"),
+      (LogConfig.FollowerReplicationThrottledReplicasProp, "1:101,2:101"),
+      ("useful.key", "value")
+    )
+
+    //Setup
+    val zk = stubZK(brokers = Seq(100, 101))
+    val admin = createMock(classOf[AdminUtilities])
+    val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), anyString())).andStubReturn(new
Properties)
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Topic), is("topic2"))).andStubReturn(copyOf(existingConfigs))
+
+    //Should change both topics
+    expect(admin.changeTopicConfig(is(zk), is("topic1"), capture(propsCapture)))
+    expect(admin.changeTopicConfig(is(zk), is("topic2"), capture(propsCapture)))
+
+    replay(admin)
+
+    //When
+    ReassignPartitionsCommand.removeThrottle(zk, proposed, status, admin)
+
+    //Then props should have gone (dummy remains)
+    for (actual <- propsCapture.getValues) {
+      assertEquals("value", actual.getProperty("useful.key"))
+      assertNull(actual.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp))
+      assertNull(actual.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp))
+    }
+    assertEquals(2, propsCapture.getValues.size) //2 topics
+  }
+
+  //Override eq as is for brevity
+  def is[T](v: T): T = EasyMock.eq(v)
+
   @Before
   def setup(): Unit = {
     calls = 0
   }
+
+  def stubZK(existingAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map[TopicAndPartition,
Seq[Int]](),
+             brokers: Seq[Int] = Seq[Int]()): ZkUtils = {
+    val zk = createMock(classOf[ZkUtils])
+    expect(zk.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Seq[String]])).andStubReturn(existingAssignment)
+    expect(zk.getAllBrokersInCluster()).andStubReturn(brokers.map { id => new Broker(id,
"", 1, SecurityProtocol.PLAINTEXT) })
+    replay(zk)
+    zk
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7da847f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 67c79ce..06d88ee 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1040,6 +1040,12 @@ object TestUtils extends Logging {
     new String(bytes, encoding)
   }
 
+  def copyOf(props: Properties): Properties = {
+    val copy = new Properties()
+    copy.putAll(props)
+    copy
+  }
+
   def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias:
String): Properties = {
     val trustStore = trustStoreFile.getOrElse {
       throw new Exception("SSL enabled but no trustStoreFile provided")


Mime
View raw message