kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [1/3] kafka git commit: KAFKA-1852; Reject offset commits to unknown topics; reviewed by Joel Koshy
Date Tue, 03 Mar 2015 19:46:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1cd6ed9e2 -> 57d38f672


KAFKA-1852; Reject offset commits to unknown topics; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/616987d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/616987d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/616987d1

Branch: refs/heads/trunk
Commit: 616987d196b654486a1261f4eed50e48560e3041
Parents: 1cd6ed9
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Tue Mar 3 11:16:38 2015 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Mar 3 11:16:38 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 22 +++++++++-----------
 .../main/scala/kafka/server/KafkaServer.scala   |  8 +++++--
 .../main/scala/kafka/server/MetadataCache.scala |  7 ++++++-
 .../main/scala/kafka/server/OffsetManager.scala | 19 ++++++++++++-----
 .../unit/kafka/server/OffsetCommitTest.scala    | 19 +++++++++++++++++
 5 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 703886a..35af98f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,10 +45,10 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val controller: KafkaController,
                 val zkClient: ZkClient,
                 val brokerId: Int,
-                val config: KafkaConfig) extends Logging {
+                val config: KafkaConfig,
+                val metadataCache: MetadataCache) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
-  val metadataCache = new MetadataCache(brokerId)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -149,7 +149,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     }
-
     // call offset manager to store offsets
     offsetManager.storeOffsets(
       offsetCommitRequest.groupId,
@@ -273,7 +272,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val hw = localReplica.highWatermark.messageOffset
             if (allOffsets.exists(_ > hw))
               hw +: allOffsets.dropWhile(_ > hw)
-            else 
+            else
               allOffsets
           }
         }
@@ -297,19 +296,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     val response = OffsetResponse(offsetRequest.correlationId, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
-  
+
   def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp:
Long, maxNumOffsets: Int): Seq[Long] = {
     logManager.getLog(topicAndPartition) match {
-      case Some(log) => 
+      case Some(log) =>
         fetchOffsetsBefore(log, timestamp, maxNumOffsets)
-      case None => 
+      case None =>
         if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
           Seq(0L)
         else
           Nil
     }
   }
-  
+
   private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]
= {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
@@ -454,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     import JavaConversions._
 
     val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
-    
+
     // the callback for sending a join-group response
     def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode:
Short) {
       val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
@@ -472,7 +471,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       joinGroupRequest.body.strategy(),
       sendResponseCallback)
   }
-  
+
   def handleHeartbeatRequest(request: RequestChannel.Request) {
     val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
 
@@ -489,11 +488,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       heartbeatRequest.body.groupGenerationId(),
       sendResponseCallback)
   }
-  
+
   def close() {
     // TODO currently closing the API is an no-op since the API no longer maintain any modules
     // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless
layer
     debug("Shut down complete.")
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 426e522..8e3def9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -68,6 +68,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
 
   var kafkaHealthcheck: KafkaHealthcheck = null
+  val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
+
+
 
   var zkClient: ZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
@@ -142,7 +145,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
         consumerCoordinator.startup()
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager,
consumerCoordinator, kafkaController, zkClient, config.brokerId, config)
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager,
consumerCoordinator,
+          kafkaController, zkClient, config.brokerId, config, metadataCache)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel,
apis, config.numIoThreads)
         brokerState.newState(RunningAsBroker)
 
@@ -402,7 +406,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
+    new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4c70aa7..6aef6e4 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -136,6 +136,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def contains(topic: String): Boolean = {
+    inReadLock(partitionMetadataLock) {
+      cache.contains(topic)
+    }
+  }
+
   private def removePartitionInfo(topic: String, partitionId: Int) = {
     cache.get(topic) match {
       case Some(infos) => {
@@ -149,4 +155,3 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index c602a80..d2d5962 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -86,7 +86,8 @@ object OffsetManagerConfig {
 class OffsetManager(val config: OffsetManagerConfig,
                     replicaManager: ReplicaManager,
                     zkClient: ZkClient,
-                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+                    scheduler: Scheduler,
+                    metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup
{
 
   /* offsets and metadata cache */
   private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -164,6 +165,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds
- startMs))
   }
 
+
   def offsetsTopicConfig: Properties = {
     val props = new Properties
     props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
@@ -214,11 +216,16 @@ class OffsetManager(val config: OffsetManagerConfig,
                    generationId: Int,
                    offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicAndPartition, Short] => Unit)
{
+    // check if there are any non-existent topics
+    val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata)
=>
+      !metadataCache.contains(topicAndPartition.topic)
+    }
 
-    // first filter out partitions with offset metadata size exceeding limit
+    // first filter out partitions with offset metadata size exceeding limit or
+    // if its a non existing topic
     // TODO: in the future we may want to only support atomic commit and hence fail the whole
commit
     val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata)
=>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+      validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition)
     }
 
     // construct the message set to append
@@ -242,7 +249,7 @@ class OffsetManager(val config: OffsetManagerConfig,
           .format(responseStatus, offsetTopicPartition))
 
       // construct the commit response status and insert
-      // the offset and metadata to cache iff the append status has no error
+      // the offset and metadata to cache if the append status has no error
       val status = responseStatus(offsetTopicPartition)
 
       val responseCode =
@@ -267,7 +274,9 @@ class OffsetManager(val config: OffsetManagerConfig,
 
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata)
=>
-        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+        if (nonExistentTopics.contains(topicAndPartition))
+          (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+        else if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
           (topicAndPartition, responseCode)
         else
           (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index a2bb885..a37a74d 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -206,4 +206,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness
{
     assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
 
   }
+
+  @Test
+  def testNonExistingTopicOffsetCommit() {
+    val topic1 = "topicDoesNotExists"
+    val topic2 = "topic-2"
+
+    createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1)
+
+    // Commit an offset
+    val expectedReplicaAssignment = Map(0  -> List(1))
+    val commitRequest = OffsetCommitRequest(group, immutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
+      TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)
+    ))
+    val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1,
0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2,
0)).get)
+  }
 }


Mime
View raw message