kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2338; Warn on max.message.bytes change
Date Wed, 21 Oct 2015 00:21:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 403158b54 -> 44f6c4b94


KAFKA-2338; Warn on max.message.bytes change

- Both TopicCommand and ConfigCommand warn if message.max.bytes increases
- Log failures on the broker if replication gets stuck due to an oversized message
- Added blocking call to warning.

Author: benstopford <benstopford@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #322 from benstopford/CPKAFKA-61


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44f6c4b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44f6c4b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44f6c4b9

Branch: refs/heads/trunk
Commit: 44f6c4b946511ce4663d41bf40f2960d2faee198
Parents: 403158b
Author: benstopford <benstopford@gmail.com>
Authored: Tue Oct 20 17:21:46 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Oct 20 17:21:46 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ConfigCommand.scala  | 16 +++++-
 .../main/scala/kafka/admin/TopicCommand.scala   | 56 +++++++++++++++++++-
 .../kafka/server/ReplicaFetcherThread.scala     |  9 ++++
 3 files changed, 78 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ba4c003..a6984be 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -19,7 +19,9 @@ package kafka.admin
 
 import joptsimple._
 import java.util.Properties
-import kafka.log.LogConfig
+import kafka.admin.TopicCommand._
+import kafka.consumer.ConsumerConfig
+import kafka.log.{Defaults, LogConfig}
 import kafka.server.ConfigType
 import kafka.utils.{ZkUtils, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
@@ -67,6 +69,7 @@ object ConfigCommand {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entityType = opts.options.valueOf(opts.entityType)
     val entityName = opts.options.valueOf(opts.entityName)
+    warnOnMaxMessagesChange(configsToBeAdded)
 
     // compile the final set of configs
     val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
@@ -82,6 +85,17 @@ object ConfigCommand {
     }
   }
 
+  def warnOnMaxMessagesChange(configs: Properties): Unit = {
+    val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
+      case n: String => n.toInt
+      case _ => -1
+    }
+    if (maxMessageBytes > Defaults.MaxMessageSize){
+      error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
+      TopicCommand.askToProceed
+    }
+  }
+
   private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
     val entityType = opts.options.valueOf(opts.entityType)
     val entityNames: Seq[String] =

http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9fe2606..e6ca112 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -27,8 +27,8 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.JavaConversions._
-import kafka.log.LogConfig
-import kafka.consumer.Whitelist
+import kafka.log.{Defaults, LogConfig}
+import kafka.consumer.{ConsumerConfig, Whitelist}
 import kafka.server.{ConfigType, OffsetManager}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
@@ -96,11 +96,13 @@ object TopicCommand extends Logging {
       println("WARNING: Due to limitations in metric names, topics with a period ('.') or
underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
     if (opts.options.has(opts.replicaAssignmentOpt)) {
       val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+      warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment,
configs, update = false)
     } else {
       CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
       val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
       val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+      warnOnMaxMessagesChange(configs, replicas)
       AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
     }
     println("Created topic \"%s\".".format(topic))
@@ -333,4 +335,54 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
     }
   }
+  def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = {
+    val maxMessageBytes =  configs.get(LogConfig.MaxMessageBytesProp) match {
+      case n: String => n.toInt
+      case _ => -1
+    }
+    if (maxMessageBytes > Defaults.MaxMessageSize)
+      if (replicas > 1) {
+        error(longMessageSizeWarning(maxMessageBytes))
+        askToProceed
+      }
+      else
+        warn(shortMessageSizeWarning(maxMessageBytes))
+  }
+
+  def askToProceed: Unit = {
+    println("Are you sure you want to continue? [y/n]")
+    if (!Console.readLine().equalsIgnoreCase("y")) {
+      println("Ending your session")
+      System.exit(0)
+    }
+  }
+
+  def shortMessageSizeWarning(maxMessageBytes: Int): String = {
+    "\n\n" +
+      "*****************************************************************************************************\n"
+
+      "*** WARNING: you are creating a topic where the the max.message.bytes is greater than
the consumer ***\n" +
+      "*** default. This operation is potentially dangerous. Consumers will get failures
if their        ***\n" +
+      "*** fetch.message.max.bytes < the value you are using.                        
                   ***\n" +
+      "*****************************************************************************************************\n"
+
+      s"- value set here: $maxMessageBytes\n" +
+      s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n" +
+      s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n"
+  }
+
+  def longMessageSizeWarning(maxMessageBytes: Int): String = {
+    "\n\n" +
+      "****************************************************************************************************\n"
+
+      "*** WARNING: you are creating a topic where the max.message.bytes is greater than
the broker      ***\n" +
+      "*** default. This operation is dangerous. There are two potential side effects:  
               ***\n" +
+      "*** - Consumers will get failures if their fetch.message.max.bytes < the value
you are using     ***\n" +
+      "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and
hence have    ***\n" +
+      "***   a higher risk of data loss                                                 
               ***\n" +
+      "*** You should ensure both of these settings are greater than the value set here before
using    ***\n" +
+      "*** this topic.                                                                  
               ***\n" +
+      "****************************************************************************************************\n"
+
+      s"- value set here: $maxMessageBytes\n" +
+      s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n"
+
+      s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" +
+      s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n"
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5aa817d..5993bbb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -99,6 +99,7 @@ class ReplicaFetcherThread(name: String,
       val TopicAndPartition(topic, partitionId) = topicAndPartition
       val replica = replicaMgr.getReplica(topic, partitionId).get
       val messageSet = partitionData.toByteBufferMessageSet
+      warnIfMessageOversized(messageSet)
 
       if (fetchOffset != replica.logEndOffset.messageOffset)
         throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset
= %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
@@ -121,6 +122,14 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
+  def warnIfMessageOversized(messageSet: ByteBufferMessageSet): Unit = {
+    if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0)
+      error("Replication is failing due to a message that is greater than replica.fetch.max.bytes.
This " +
+        "generally occurs when the max.message.bytes has been overridden to exceed this value
and a suitably large " +
+        "message has also been sent. To fix this problem increase replica.fetch.max.bytes
in your broker config to be " +
+        "equal or larger than your settings for max.message.bytes, both at a broker and topic
level.")
+  }
+
   /**
    * Handle a partition whose offset is out of range and return a new fetch offset.
    */


Mime
View raw message