kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1356 (Follow-up) patch to clean up metadata cache api; reviewed by Jun Rao
Date Fri, 18 Apr 2014 18:36:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 839f1b122 -> eaf514b41


KAFKA-1356 (Follow-up) patch to clean up metadata cache api; reviewed by
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaf514b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaf514b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaf514b4

Branch: refs/heads/0.8.1
Commit: eaf514b41a54242c825eab972765e2a764cd0b7f
Parents: 839f1b1
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Thu Apr 17 10:38:32 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Apr 18 11:34:32 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 40 +++++++++-----------
 1 file changed, 18 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eaf514b4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b9d2260..0513e45 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,6 @@ import scala.collection._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
-import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
@@ -65,11 +64,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
     private val partitionMetadataLock = new ReentrantReadWriteLock()
 
-    def getTopicMetadata(topics: Set[String]): Tuple2[mutable.ListBuffer[TopicMetadata],
mutable.ListBuffer[String]] = {
+    def getTopicMetadata(topics: Set[String]) = {
       val isAllTopics = topics.isEmpty
       val topicsRequested = if(isAllTopics) cache.keySet else topics
       val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
-      val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String]
       inLock(partitionMetadataLock.readLock()) {
         for (topic <- topicsRequested) {
           if (isAllTopics || this.containsTopic(topic)) {
@@ -82,11 +80,11 @@ class KafkaApis(val requestChannel: RequestChannel,
               val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
               val leader = leaderIsrAndEpoch.leaderAndIsr.leader
               val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-              debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " +
replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+              val topicPartition = TopicAndPartition(topic, partitionId)
               try {
                 leaderInfo = aliveBrokers.get(leader)
                 if (!leaderInfo.isDefined)
-                  throw new LeaderNotAvailableException("Leader not available for topic %s
partition %s".format(topic, partitionId))
+                  throw new LeaderNotAvailableException("Leader not available for %s.".format(topicPartition))
                 isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
                 if (replicaInfo.size < replicas.size)
                   throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
@@ -97,20 +95,16 @@ class KafkaApis(val requestChannel: RequestChannel,
                 new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
               } catch {
                 case e: Throwable =>
-                  debug("Error while fetching metadata for topic %s partition %s. Possible
cause: %s".format(topic, partitionId, e.getMessage))
+                  debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition,
e.getMessage))
                   new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
                     ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
               }
             }
             topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
-          } else if (config.autoCreateTopicsEnable) {
-            topicsToBeCreated += topic
-          } else {
-            topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
           }
         }
       }
-      (topicResponses, topicsToBeCreated)
+      topicResponses
     }
 
     def addPartitionInfo(topic: String,
@@ -654,17 +648,19 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
-    val (topicResponses, topicsToBeCreated) = metadataCache.getTopicMetadata(topics)
-
-    topicResponses.appendAll(topicsToBeCreated.map { topic =>
-      try {
-        AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-        info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic,
config.numPartitions, config.defaultReplicationFactor))
-      } catch {
-        case e: TopicExistsException => // let it go, possibly another broker created
this topic
-      }
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-    })
+    val topicResponses = metadataCache.getTopicMetadata(topics)
+    if (topics.size > 0 && topicResponses.size != topics.size && config.autoCreateTopicsEnable)
{
+      val topicsToBeCreated = topics -- topicResponses.map(_.topic).toSet
+      topicResponses.appendAll(topicsToBeCreated.map { topic =>
+        try {
+          AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+          info("Auto creation of topic %s with %d partitions and replication factor %d is
successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
+        } catch {
+          case e: TopicExistsException => // let it go, possibly another broker created
this topic
+        }
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+      })
+    }
 
     topicResponses
   }


Mime
View raw message