kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1580; disallow non-admin clients to produce to internal (e.g. offsets) topics; reviewed by Joel Koshy
Date Mon, 24 Nov 2014 22:51:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 72601f783 -> 9f8b8dad2


KAFKA-1580; disallow non-admin clients to produce to internal (e.g. offsets) topics; reviewed
by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f8b8dad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f8b8dad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f8b8dad

Branch: refs/heads/trunk
Commit: 9f8b8dad2b7ad31c9595f559f6d9e7d07d2f696d
Parents: 72601f7
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Mon Nov 24 14:38:49 2014 -0800
Committer: Guozhang Wang <guwang@linkedin.com>
Committed: Mon Nov 24 14:38:49 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala | 21 +++--
 .../src/main/scala/kafka/server/KafkaApis.scala |  5 ++
 .../main/scala/kafka/server/OffsetManager.scala | 13 +--
 .../scala/kafka/server/ReplicaManager.scala     | 88 +++++++++++---------
 .../kafka/api/ProducerFailureHandlingTest.scala | 23 ++---
 5 files changed, 87 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8b8dad/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 94c5332..28b12c7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -17,25 +17,30 @@
 
 package kafka.admin
 
-import java.util.Random
-import java.util.Properties
-import kafka.api.{TopicMetadata, PartitionMetadata}
+import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.utils.{Logging, ZkUtils, Json}
-import org.I0Itec.zkclient.ZkClient
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.api.{TopicMetadata, PartitionMetadata}
+
+import java.util.Random
+import java.util.Properties
+import scala.Some
+import scala.Predef._
 import scala.collection._
 import mutable.ListBuffer
 import scala.collection.mutable
-import kafka.common._
-import scala.Predef._
 import collection.Map
-import scala.Some
 import collection.Set
 
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
 object AdminUtils extends Logging {
   val rand = new Random
+
+  val AdminClientId = "__admin_client"
+
   val TopicConfigChangeZnodePrefix = "config_change_"
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8b8dad/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 968b0c4..2a1c032 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -188,10 +188,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
+    // only allow appending to internal topic partitions
+    // if the client is not from admin
+    val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
+
     // call the replica manager to append messages to the replicas
     replicaManager.appendMessages(
       produceRequest.ackTimeoutMs.toLong,
       produceRequest.requiredAcks,
+      internalTopicsAllowed,
       produceRequest.data,
       sendResponseCallback)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8b8dad/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 2957bc4..3c79428 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -29,6 +29,7 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import kafka.tools.MessageFormatter
+import kafka.api.ProducerResponseStatus
 
 import scala.Some
 import scala.collection._
@@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.ZkClient
-import kafka.api.ProducerResponseStatus
 
 
 /**
@@ -206,7 +206,7 @@ class OffsetManager(val config: OffsetManagerConfig,
    * Store offsets by appending it to the replicated log and then inserting to cache
    */
   // TODO: generation id and consumer id is needed by coordinator to do consumer checking
in the future
-  def storeOffsets(groupName: String,
+  def storeOffsets(groupId: String,
                    consumerId: String,
                    generationId: Int,
                    offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
@@ -221,12 +221,12 @@ class OffsetManager(val config: OffsetManagerConfig,
     // construct the message set to append
     val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata)
=>
       new Message(
-        key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition),
+        key = OffsetManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
         bytes = OffsetManager.offsetCommitValue(offsetAndMetadata)
       )
     }.toSeq
 
-    val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupName))
+    val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -245,12 +245,12 @@ class OffsetManager(val config: OffsetManagerConfig,
       val responseCode =
         if (status.error == ErrorMapping.NoError) {
           filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
-            putOffset(GroupTopicPartition(groupName, topicAndPartition), offsetAndMetadata)
+            putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
           }
           ErrorMapping.NoError
         } else {
           debug("Offset commit %s from group %s consumer %s with generation %d failed when
appending to log due to %s"
-            .format(filteredOffsetMetadata, groupName, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+            .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
 
           // transform the log append error code to the corresponding the commit status error
code
           if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
@@ -278,6 +278,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     replicaManager.appendMessages(
       config.offsetCommitTimeoutMs.toLong,
       config.offsetCommitRequiredAcks,
+      true, // allow appending to internal offset topic
       offsetsAndMetadataMessageSet,
       putCacheCallback)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8b8dad/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f043f04..b3566b0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -249,11 +249,12 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def appendMessages(timeout: Long,
                      requiredAcks: Short,
+                     internalTopicsAllowed: Boolean,
                      messagesPerPartition: Map[TopicAndPartition, MessageSet],
                      responseCallback: Map[TopicAndPartition, ProducerResponseStatus] =>
Unit) {
 
     val sTime = SystemTime.milliseconds
-    val localProduceResults = appendToLocalLog(messagesPerPartition, requiredAcks)
+    val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition,
requiredAcks)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
     val produceStatus = localProduceResults.map{ case (topicAndPartition, result) =>
@@ -292,50 +293,59 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Append the messages to the local replica logs
    */
-  private def appendToLocalLog(messagesPerPartition: Map[TopicAndPartition, MessageSet],
+  private def appendToLocalLog(internalTopicsAllowed: Boolean,
+                               messagesPerPartition: Map[TopicAndPartition, MessageSet],
                                requiredAcks: Short): Map[TopicAndPartition, LogAppendResult]
= {
     trace("Append [%s] to local log ".format(messagesPerPartition))
     messagesPerPartition.map { case (topicAndPartition, messages) =>
-      try {
-        val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition)
-        val info = partitionOpt match {
-          case Some(partition) =>
-            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],
requiredAcks)
-          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't
exist on %d"
-            .format(topicAndPartition, localBrokerId))
-        }
+      // reject appending to internal topics if it is not allowed
+      if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed)
{
 
-        val numAppendedMessages =
-          if (info.firstOffset == -1L || info.lastOffset == -1L)
-            0
-          else
-            info.lastOffset - info.firstOffset + 1
+        (topicAndPartition, LogAppendResult(
+          LogAppendInfo.UnknownLogAppendInfo,
+          Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)))))
+      } else {
+        try {
+          val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition)
+          val info = partitionOpt match {
+            case Some(partition) =>
+              partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],
requiredAcks)
+            case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't
exist on %d"
+              .format(topicAndPartition, localBrokerId))
+          }
+
+          val numAppendedMessages =
+            if (info.firstOffset == -1L || info.lastOffset == -1L)
+              0
+            else
+              info.lastOffset - info.firstOffset + 1
 
-        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
-        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
+          // update stats for successfully appended bytes and messages as bytesInRate and
messageInRate
+          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+          BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
+          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
+          BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 
-        trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset
%d"
-          .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset,
info.lastOffset))
-        (topicAndPartition, LogAppendResult(info))
-      } catch {
-        // NOTE: Failed produce requests metric is not incremented for known exceptions
-        // it is supposed to indicate un-expected failures of a broker in handling a produce
request
-        case e: KafkaStorageException =>
-          fatal("Halting due to unrecoverable I/O error while handling produce request: ",
e)
-          Runtime.getRuntime.halt(1)
-          (topicAndPartition, null)
-        case utpe: UnknownTopicOrPartitionException =>
-          (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
-        case nle: NotLeaderForPartitionException =>
-          (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
-        case e: Throwable =>
-          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
-          BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
-          error("Error processing append operation on partition %s".format(topicAndPartition),
e)
-          (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
+          trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset
%d"
+            .format(messages.size, topicAndPartition.topic, topicAndPartition.partition,
info.firstOffset, info.lastOffset))
+          (topicAndPartition, LogAppendResult(info))
+        } catch {
+          // NOTE: Failed produce requests metric is not incremented for known exceptions
+          // it is supposed to indicate un-expected failures of a broker in handling a produce
request
+          case e: KafkaStorageException =>
+            fatal("Halting due to unrecoverable I/O error while handling produce request:
", e)
+            Runtime.getRuntime.halt(1)
+            (topicAndPartition, null)
+          case utpe: UnknownTopicOrPartitionException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
+          case nle: NotLeaderForPartitionException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
+          case e: Throwable =>
+            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
+            BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
+            error("Error processing append operation on partition %s".format(topicAndPartition),
e)
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f8b8dad/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 8531f53..a913fe5 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -15,27 +15,27 @@
  * limitations under the License.
  */
 
-package kafka.api
+package kafka.api.test
 
-import kafka.common.Topic
-import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException}
-import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
-import java.util.{Properties, Random}
 import java.lang.Integer
+import java.util.{Properties, Random}
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
+import kafka.api.FetchRequestBuilder
+import kafka.common.Topic
+import kafka.consumer.SimpleConsumer
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
 import kafka.integration.KafkaServerTestHarness
-import kafka.consumer.SimpleConsumer
+import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
 
 import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException}
 import org.apache.kafka.clients.producer._
 
-class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness {
+class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
   private val serverMessageMaxBytes =  producerBufferSize/2
 
@@ -297,9 +297,12 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent,
uniqueMessageSize)
   }
 
-  @Test(expected = classOf[InvalidTopicException])
+  @Test
   def testCannotSendToInternalTopic() {
-    producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+    val thrown = intercept[ExecutionException] {
+      producer2.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+    }
+    assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException])
   }
 
   @Test


Mime
View raw message