kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-3831; Prepare for updating new-consumer-based Mirror Maker's default partition assignment strategy to round robin
Date Tue, 27 Sep 2016 02:07:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk be6056abc -> 60ad6d727


KAFKA-3831; Prepare for updating new-consumer-based Mirror Maker's default partition assignment
strategy to round robin

This patch adds proper warning message and necessary doc updates for updating the default
partition assignment strategy of Mirror Maker from range to round robin. The actual switch
would occur as part of a major release cycle (to be scheduled).

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1499 from vahidhashemian/KAFKA-3831


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/60ad6d72
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/60ad6d72
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/60ad6d72

Branch: refs/heads/trunk
Commit: 60ad6d727861a87fa756918a7be7547e9b1f4c3d
Parents: be6056a
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Mon Sep 26 19:04:22 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Sep 26 19:04:22 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 31 ++++++++++++--------
 1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/60ad6d72/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 4346074..1373f51 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -25,7 +25,7 @@ import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig,
ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
+import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig
=> OldConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist,
ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
@@ -43,6 +43,7 @@ import org.apache.kafka.common.record.Record
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 import scala.util.control.ControlThrowable
+import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
 
 /**
  * The mirror maker has the following architecture:
@@ -174,6 +175,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
       CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
 
+      val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
+
       val useNewConsumer = options.has(useNewConsumerOpt)
       if (useNewConsumer) {
         if (options.has(blacklistOpt)) {
@@ -184,6 +187,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           error("whitelist must be specified when using new consumer in mirror maker.")
           System.exit(1)
         }
+
+        if (!consumerProps.keySet().contains(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
+          System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based
mirror maker will " +
+            "change from 'range' to 'roundrobin' in an upcoming release (so that better load
balancing can be achieved). If " +
+            "you prefer to make this switch in advance of that release add the following
to the corresponding new-consumer " +
+            "config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
       } else {
         if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
           error("Exactly one of whitelist or blacklist is required.")
@@ -233,7 +242,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           throw new IllegalArgumentException("The rebalance listener should be an instance
of kafka.consumer.ConsumerRebalanceListener")
         createOldConsumers(
           numStreams,
-          options.valueOf(consumerConfigOpt),
+          consumerProps,
           customRebalanceListener,
           Option(options.valueOf(whitelistOpt)),
           Option(options.valueOf(blacklistOpt)))
@@ -256,7 +265,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
             "org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
         createNewConsumers(
           numStreams,
-          options.valueOf(consumerConfigOpt),
+          consumerProps,
           customRebalanceListener,
           Option(options.valueOf(whitelistOpt)))
       }
@@ -289,12 +298,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   private def createOldConsumers(numStreams: Int,
-                                consumerConfigPath: String,
-                                customRebalanceListener: Option[ConsumerRebalanceListener],
-                                whitelist: Option[String],
-                                blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer]
= {
-    // Create consumer connector
-    val consumerConfigProps = Utils.loadProps(consumerConfigPath)
+                                 consumerConfigProps: Properties,
+                                 customRebalanceListener: Option[ConsumerRebalanceListener],
+                                 whitelist: Option[String],
+                                 blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer]
= {
     // Disable consumer auto offsets commit to prevent data loss.
     maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false")
     // Set the consumer timeout so we will not block for low volume pipeline. The timeout
is necessary to make sure
@@ -304,7 +311,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val groupIdString = consumerConfigProps.getProperty("group.id")
     val connectors = (0 until numStreams) map { i =>
       consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
-      val consumerConfig = new ConsumerConfig(consumerConfigProps)
+      val consumerConfig = new OldConsumerConfig(consumerConfigProps)
       new ZookeeperConsumerConnector(consumerConfig)
     }
 
@@ -324,11 +331,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   def createNewConsumers(numStreams: Int,
-                         consumerConfigPath: String,
+                         consumerConfigProps: Properties,
                          customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
                          whitelist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
-    // Create consumer connector
-    val consumerConfigProps = Utils.loadProps(consumerConfigPath)
     // Disable consumer auto offsets commit to prevent data loss.
     maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
     // Hardcode the deserializer to ByteArrayDeserializer


Mime
View raw message