kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3414; Return of MetadataCache.getAliveBrokers should not be mutated by cache updates
Date Thu, 17 Mar 2016 20:56:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c07d01722 -> 8ef804dc1


KAFKA-3414; Return of MetadataCache.getAliveBrokers should not be mutated by cache updates

`Map.values` returns `DefaultValuesIterable` where the default implementation of `toSeq` is
(sadly) `toStream`. `Stream` is a lazy collection and it can reflect changes to the underlying
map before it's `forced`.

I verified that the test failed before my change.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1088 from ijuma/kafka-3414-get-alive-brokers-no-mutation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ef804dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ef804dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ef804dc

Branch: refs/heads/trunk
Commit: 8ef804dc194bb562b6dbe48855e81965cacd1114
Parents: c07d017
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Mar 17 13:56:27 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 17 13:56:27 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/MetadataCache.scala |  2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   | 28 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ef804dc/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index b23ecbe..4b68f70 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -132,7 +132,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
 
   def getAliveBrokers: Seq[Broker] = {
     inReadLock(partitionMetadataLock) {
-      aliveBrokers.values.toSeq
+      aliveBrokers.values.toBuffer
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ef804dc/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index dcc310f..017faea 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -238,4 +238,32 @@ class MetadataCacheTest {
 
   }
 
+  @Test
+  def getAliveBrokersShouldNotBeMutatedByUpdateCache() {
+    val topic = "topic"
+    val cache = new MetadataCache(1)
+
+    def updateCache(brokerIds: Set[Int]) {
+      val brokers = brokerIds.map { brokerId =>
+        new Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava,
"")
+      }
+      val controllerEpoch = 1
+      val leader = 0
+      val leaderEpoch = 0
+      val replicas = asSet[Integer](0)
+      val isr = asList[Integer](0, 1)
+      val partitionStates = Map(
+        new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
isr, 3, replicas))
+      val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava,
brokers.asJava)
+      cache.updateCache(15, updateMetadataRequest)
+    }
+
+    val initialBrokerIds = (0 to 2).toSet
+    updateCache(initialBrokerIds)
+    val aliveBrokersFromCache = cache.getAliveBrokers
+    // This should not change `aliveBrokersFromCache`
+    updateCache((0 to 3).toSet)
+    assertEquals(initialBrokerIds, aliveBrokersFromCache.map(_.id).toSet)
+  }
+
 }


Mime
View raw message