kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-6319: Quote strings stored in JSON configs
Date Tue, 12 Dec 2017 10:26:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a5cd34d79 -> 651c6e480


KAFKA-6319: Quote strings stored in JSON configs

This is required for ACLs where SSL principals contain
special characters (e.g. comma) that are escaped using
backslash. The strings need to be quoted for JSON to
ensure that the JSON stored in ZK is valid.

Also converted `SslEndToEndAuthorizationTest` to use a
principal with special characters to ensure that this
path is tested.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4303 from rajinisivaram/KAFKA-6319


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/651c6e48
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/651c6e48
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/651c6e48

Branch: refs/heads/trunk
Commit: 651c6e480a233ce374bc0a9128f0398d18de7898
Parents: a5cd34d
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Tue Dec 12 11:05:29 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Dec 12 12:05:11 2017 +0200

----------------------------------------------------------------------
 .../org/apache/kafka/test/TestSslUtils.java     |  4 +--
 core/src/main/scala/kafka/utils/Json.scala      | 13 ++++++--
 .../kafka/api/IntegrationTestHarness.scala      | 11 ++++--
 .../api/SslEndToEndAuthorizationTest.scala      | 35 ++++++++++++++++----
 .../scala/unit/kafka/admin/AclCommandTest.scala |  4 ++-
 .../test/scala/unit/kafka/utils/JsonTest.scala  | 14 ++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala | 18 ++++++----
 7 files changed, 78 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 6c057d0..653ac82 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -179,9 +179,9 @@ public class TestSslUtils {
     }
 
     public static  Map<String, Object> createSslConfig(boolean useClientCert, boolean
trustStore,
-            Mode mode, File trustStoreFile, String certAlias, String hostName)
+            Mode mode, File trustStoreFile, String certAlias, String cn)
         throws IOException, GeneralSecurityException {
-        return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias,
hostName, new CertificateBuilder());
+        return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias,
cn, new CertificateBuilder());
     }
 
     public static  Map<String, Object> createSslConfig(boolean useClientCert, boolean
trustStore,

http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index ad40c49..c654d45 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -36,7 +36,16 @@ object Json {
    */
   def parseFull(input: String): Option[JsonValue] =
     try Option(mapper.readTree(input)).map(JsonValue(_))
-    catch { case _: JsonProcessingException => None }
+    catch {
+      case _: JsonProcessingException =>
+        // Before 1.0.1, Json#encode did not escape backslash or any other special characters.
SSL principals
+        // stored in ACLs may contain backslash as an escape char, making the JSON generated
in earlier versions invalid.
+        // Escape backslash and retry to handle these strings which may have been persisted
in ZK.
+        // Note that this does not handle all special characters (e.g. non-escaped double
quotes are not supported)
+        val escapedInput = input.replaceAll("\\\\", "\\\\\\\\")
+        try Option(mapper.readTree(escapedInput)).map(JsonValue(_))
+        catch { case _: JsonProcessingException => None }
+    }
 
   /**
    * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input`
is not valid JSON.
@@ -56,7 +65,7 @@ object Json {
     obj match {
       case null => "null"
       case b: Boolean => b.toString
-      case s: String => "\"" + s + "\""
+      case s: String => mapper.writeValueAsString(s)
       case n: Number => n.toString
       case m: Map[_, _] => "{" +
         m.map {

http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 4b27239..9c1b479 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -26,7 +26,7 @@ import java.util.Properties
 import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.junit.{After, Before}
 
 import scala.collection.mutable.Buffer
@@ -69,8 +69,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   @Before
   override def setUp() {
-    val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
-    val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
+    val producerSecurityProps = clientSecurityProps("producer")
+    val consumerSecurityProps = clientSecurityProps("consumer")
     super.setUp()
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
@@ -87,6 +87,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     TestUtils.createOffsetsTopic(zkUtils, servers)
   }
 
+  def clientSecurityProps(certAlias: String): Properties = {
+    TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, certAlias, TestUtils.SslCertificateCn,
+      clientSaslProperties)
+  }
+
   def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
       TestUtils.createNewProducer(brokerList,
                                   securityProtocol = this.securityProtocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index b304f29..8354ee0 100644
--- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -17,20 +17,29 @@
 
 package kafka.api
 
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.security.auth._
 import org.junit.Before
 
 object SslEndToEndAuthorizationTest {
   class TestPrincipalBuilder extends KafkaPrincipalBuilder {
-    private val Pattern = "O=A (.*?),CN=localhost".r
+    private val Pattern = "O=A (.*?),CN=(.*?)".r
 
+    // Use full DN as client principal to test special characters in principal
+    // Use field from DN as server principal to test custom PrincipalBuilder
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       context match {
         case ctx: SslAuthenticationContext =>
-          ctx.session.getPeerPrincipal.getName match {
-            case Pattern(name) =>
-              new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name)
+          val peerPrincipal = ctx.session.getPeerPrincipal.getName
+          peerPrincipal match {
+            case Pattern(name, _) =>
+              val principal = if (name == "server") name else peerPrincipal
+              new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
             case _ =>
               KafkaPrincipal.ANONYMOUS
           }
@@ -40,18 +49,32 @@ object SslEndToEndAuthorizationTest {
 }
 
 class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+
   import kafka.api.SslEndToEndAuthorizationTest.TestPrincipalBuilder
 
   override protected def securityProtocol = SecurityProtocol.SSL
+
   this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
   this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[TestPrincipalBuilder].getName)
-  override val clientPrincipal = "client"
+  // Escaped characters in DN attribute values: from http://www.ietf.org/rfc/rfc2253.txt
+  // - a space or "#" character occurring at the beginning of the string
+  // - a space character occurring at the end of the string
+  // - one of the characters ",", "+", """, "\", "<", ">" or ";"
+  //
+  // Leading and trailing spaces in Kafka principal dont work with ACLs, but we can workaround
by using
+  // a PrincipalBuilder that removes/replaces them.
+  private val clientCn = """\#A client with special chars in CN : (\, \+ \" \\ \< \>
\; ')"""
+  override val clientPrincipal = s"O=A client,CN=$clientCn"
   override val kafkaPrincipal = "server"
-
   @Before
   override def setUp() {
     startSasl(jaasSections(List.empty, None, ZkSasl))
     super.setUp()
   }
 
+  override def clientSecurityProps(certAlias: String): Properties = {
+    val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile,
certAlias, clientCn, clientSaslProperties)
+    props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
+    props
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index ecf3427..7f71faf 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -28,7 +28,9 @@ import org.junit.Test
 
 class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
-  private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
KafkaPrincipal.fromString("User:test2"))
+  private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+    KafkaPrincipal.fromString("User:test2"),
+    KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \<
\> \; ')"""))
   private val Hosts = Set("host1", "host2")
   private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
   private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")

http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/core/src/test/scala/unit/kafka/utils/JsonTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
index 8bba50b..93509b4 100644
--- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -21,7 +21,9 @@ import org.junit.Test
 import com.fasterxml.jackson.databind.JsonNode
 import com.fasterxml.jackson.databind.node._
 import kafka.utils.json.JsonValue
+
 import scala.collection.JavaConverters._
+import scala.collection.Map
 
 class JsonTest {
 
@@ -42,6 +44,16 @@ class JsonTest {
     val arrayNode = new ArrayNode(jnf)
     Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add)
     assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode)))
+
+    // Test with encoder that properly escapes backslash and quotes
+    val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""")
+    val encoded = Json.encode(map)
+    val decoded = Json.parseFull(encoded)
+    assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded)
+
+    // Test strings with non-escaped backslash and quotes. This is to verify that ACLs
+    // containing non-escaped chars persisted using 1.0 can be parsed.
+    assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\,bar2", "foo2":"\bar"}"""))
   }
 
   @Test
@@ -61,6 +73,8 @@ class JsonTest {
     assertEquals("{}", Json.encode(Map()))
     assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2)))
     assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" ->
Seq(3,4))))
+    assertEquals(""""str1\\,str2"""", Json.encode("""str1\,str2"""))
+    assertEquals(""""\"quoted\""""", Json.encode(""""quoted""""))
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/651c6e48/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 360b9dc..1da2d7b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -76,6 +76,8 @@ object TestUtils extends Logging {
   val MockZkPort = 1
   /** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK
server. */
   val MockZkConnect = "127.0.0.1:" + MockZkPort
+  // CN in SSL certificates - this is used for endpoint validation when enabled
+  val SslCertificateCn = "localhost"
 
   private val transactionStatusKey = "transactionStatus"
   private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8)
@@ -519,14 +521,15 @@ object TestUtils extends Logging {
     new Producer[K, V](new kafka.producer.ProducerConfig(props))
   }
 
-  private def securityConfigs(mode: Mode,
+  def securityConfigs(mode: Mode,
                               securityProtocol: SecurityProtocol,
                               trustStoreFile: Option[File],
                               certAlias: String,
+                              certCn: String,
                               saslProperties: Option[Properties]): Properties = {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
-      props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile,
certAlias)
+      props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile,
certAlias, certCn)
 
     if (usesSaslAuthentication(securityProtocol))
       props ++= JaasTestUtils.saslConfigs(saslProperties)
@@ -535,7 +538,7 @@ object TestUtils extends Logging {
   }
 
   def producerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File],
saslProperties: Option[Properties]): Properties =
-    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", saslProperties)
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", SslCertificateCn,
saslProperties)
 
   /**
    * Create a (new) producer with a few pre-configured properties.
@@ -596,10 +599,10 @@ object TestUtils extends Logging {
   }
 
   def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File],
saslProperties: Option[Properties]): Properties =
-    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties)
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", SslCertificateCn,
saslProperties)
 
   def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File],
saslProperties: Option[Properties]): Properties =
-    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", saslProperties)
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", SslCertificateCn,
saslProperties)
 
   /**
    * Create a new consumer with a few pre-configured properties.
@@ -1220,12 +1223,13 @@ object TestUtils extends Logging {
     copy
   }
 
-  def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias:
String): Properties = {
+  def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias:
String,
+                 certCn: String = SslCertificateCn): Properties = {
     val trustStore = trustStoreFile.getOrElse {
       throw new Exception("SSL enabled but no trustStoreFile provided")
     }
 
-    val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, trustStore, certAlias)
+    val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, trustStore, certAlias,
certCn)
 
     val sslProps = new Properties()
     sslConfigs.asScala.foreach { case (k, v) => sslProps.put(k, v) }


Mime
View raw message