kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3402; Restore behaviour of MetadataCache.getTopicMetadata when unsupported security protocol is received
Date Wed, 16 Mar 2016 23:15:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cd8bd606c -> 858047a12


KAFKA-3402; Restore behaviour of MetadataCache.getTopicMetadata when unsupported security
protocol is received

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson, Grant Henke

Closes #1073 from ijuma/kafka-3402-restore-get-topic-metadata-behaviour


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/858047a1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/858047a1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/858047a1

Branch: refs/heads/trunk
Commit: 858047a12ba3a7d426178c63226dd2c7509f20dd
Parents: cd8bd60
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Mar 16 16:15:55 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Mar 16 16:15:55 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/MetadataCache.scala |  7 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   | 90 ++++++++++++++------
 3 files changed, 72 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/858047a1/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 6df261c..b23ecbe 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -24,7 +24,7 @@ import scala.collection.{Seq, Set, mutable}
 import scala.collection.JavaConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
-import kafka.common.TopicAndPartition
+import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition}
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
@@ -55,7 +55,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
   }
 
   private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): Option[Node] =
-    aliveNodes.get(brokerId).flatMap(_.get(protocol))
+    aliveNodes.get(brokerId).map { nodeMap =>
+      nodeMap.getOrElse(protocol,
+        throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not support
security protocol `$protocol`"))
+    }
 
   private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]]
= {
     cache.get(topic).map { partitions =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/858047a1/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 33027e7..b09c541 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -187,7 +187,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   private def createUpdateMetadataRequest = {
     val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue,
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
-      Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost",
0)).asJava)).asJava
+      Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost",
0)).asJava, null)).asJava
     new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/858047a1/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index f3f0c87..dcc310f 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -19,10 +19,11 @@ package kafka.server
 import java.util
 import util.Arrays.asList
 
+import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.UpdateMetadataRequest
-import org.apache.kafka.common.requests.UpdateMetadataRequest.{PartitionState, Broker, EndPoint}
+import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint, PartitionState}
 import org.junit.Test
 import org.junit.Assert._
 
@@ -49,10 +50,18 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(
-      new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava),
-      new Broker(1, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava),
-      new Broker(2, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava))
+
+    def securityProtocolToEndPoint(brokerId: Int): Map[SecurityProtocol, EndPoint] = {
+      val host = s"foo-$brokerId"
+      Map(
+        SecurityProtocol.PLAINTEXT -> new EndPoint(host, 9092),
+        SecurityProtocol.SSL -> new EndPoint(host, 9093)
+      )
+    }
+
+    val brokers = (0 to 2).map { brokerId =>
+      new Broker(brokerId, securityProtocolToEndPoint(brokerId).asJava, "rack1")
+    }.toSet
 
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0),
zkVersion, asSet(0)),
@@ -62,24 +71,32 @@ class MetadataCacheTest {
     val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava)
     cache.updateCache(15, updateMetadataRequest)
 
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
-    assertEquals(1, topicMetadatas.size)
+    for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) {
+      val topicMetadatas = cache.getTopicMetadata(Set(topic), securityProtocol)
+      assertEquals(1, topicMetadatas.size)
+
+      val topicMetadata = topicMetadatas.head
+      assertEquals(Errors.NONE, topicMetadata.error)
+      assertEquals(topic, topicMetadata.topic)
+
+      val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
+      assertEquals(3, partitionMetadatas.size)
+
+      for (i <- 0 to 2) {
+        val partitionMetadata = partitionMetadatas(i)
+        assertEquals(Errors.NONE, partitionMetadata.error)
+        assertEquals(i, partitionMetadata.partition)
+        val leader = partitionMetadata.leader
+        assertEquals(i, leader.id)
+        val endPoint = securityProtocolToEndPoint(partitionMetadata.leader.id)(securityProtocol)
+        assertEquals(endPoint.host, leader.host)
+        assertEquals(endPoint.port, leader.port)
+        assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id))
+        assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id))
+      }
 
-    val topicMetadata = topicMetadatas.head
-    assertEquals(Errors.NONE, topicMetadata.error)
-    assertEquals(topic, topicMetadata.topic)
-
-    val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
-    assertEquals(3, partitionMetadatas.size)
-
-    for (i <- 0 to 2) {
-      val partitionMetadata = partitionMetadatas(i)
-      assertEquals(Errors.NONE, partitionMetadata.error)
-      assertEquals(i, partitionMetadata.partition)
-      assertEquals(i, partitionMetadata.leader.id)
-      assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id))
-      assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id))
     }
+
   }
 
   @Test
@@ -91,7 +108,7 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava))
+    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava, null))
 
     val leader = 1
     val leaderEpoch = 1
@@ -127,7 +144,7 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava))
+    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava, null))
 
     // replica 1 is not available
     val leader = 0
@@ -166,7 +183,7 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava))
+    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava, "rack1"))
 
     // replica 1 is not available
     val leader = 0
@@ -196,4 +213,29 @@ class MetadataCacheTest {
     assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
   }
 
+  @Test
+  def getTopicMetadataWithNonSupportedSecurityProtocol() {
+    val topic = "topic"
+    val cache = new MetadataCache(1)
+    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo",
9092)).asJava, ""))
+    val controllerEpoch = 1
+    val leader = 0
+    val leaderEpoch = 0
+    val replicas = asSet[Integer](0)
+    val isr = asList[Integer](0, 1)
+    val partitionStates = Map(
+      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
isr, 3, replicas))
+    val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava,
brokers.asJava)
+    cache.updateCache(15, updateMetadataRequest)
+
+    try {
+      val result = cache.getTopicMetadata(Set(topic), SecurityProtocol.SSL)
+      fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol,
$result was returned instead")
+    }
+    catch {
+      case e: BrokerEndPointNotAvailableException => //expected
+    }
+
+  }
+
 }


Mime
View raw message