kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-2675; SASL/Kerberos follow up
Date Wed, 28 Oct 2015 17:48:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b6fe164dd -> 9855bb9c6


http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d5ab262..8a5038f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -513,7 +513,7 @@ class KafkaConfigTest {
         case KafkaConfig.SaslKerberosTicketRenewWindowFactorProp =>
         case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
         case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
-        case KafkaConfig.AuthToLocalProp => // ignore string
+        case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
 
         case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name,
"not_a_number", "-1")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9855bb9c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5ad548d..4059512 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -427,6 +427,20 @@ object TestUtils extends Logging {
     new Producer[K, V](new ProducerConfig(props))
   }
 
+  private def securityConfigs(mode: Mode,
+                              securityProtocol: SecurityProtocol,
+                              trustStoreFile: Option[File],
+                              certAlias: String): Properties = {
+    val props = new Properties
+    if (usesSslTransportLayer(securityProtocol))
+      props.putAll(sslConfigs(mode, false, trustStoreFile, certAlias))
+    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
+    props
+  }
+
+  def producerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File]):
Properties =
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer")
+
   /**
    * Create a (new) producer with a few pre-configured properties.
    */
@@ -439,7 +453,7 @@ object TestUtils extends Logging {
                         lingerMs: Long = 0,
                         securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                         trustStoreFile: Option[File] = None,
-                        props: Option[Properties] = None) : KafkaProducer[Array[Byte],Array[Byte]]
= {
+                        props: Option[Properties] = None): KafkaProducer[Array[Byte], Array[Byte]]
= {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = props.getOrElse(new Properties)
@@ -461,9 +475,8 @@ object TestUtils extends Logging {
       if (!producerProps.containsKey(key)) producerProps.put(key, value)
     }
 
-    if (usesSslTransportLayer(securityProtocol))
-      producerProps.putAll(sslConfigs(Mode.CLIENT, false, trustStoreFile, "producer"))
-    producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
+    producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile))
+
     new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 
@@ -472,6 +485,9 @@ object TestUtils extends Logging {
     case _ => false
   }
 
+  def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File]):
Properties =
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer")
+
   /**
    * Create a new consumer with a few pre-configured properties.
    */
@@ -490,15 +506,16 @@ object TestUtils extends Logging {
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
     consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, partitionFetchSize.toString)
+
     consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy)
     consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toString)
-    if (usesSslTransportLayer(securityProtocol))
-      consumerProps.putAll(sslConfigs(Mode.CLIENT, false, trustStoreFile, "consumer"))
-    consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
+
+    consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile))
+
     new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
   }
 


Mime
View raw message