kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Suppress ProducerConfig warning in MirrorMaker
Date Mon, 03 Apr 2017 23:18:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b9b2cfc28 -> ca2979f84


MINOR: Suppress ProducerConfig warning in MirrorMaker

Though MirrorMaker uses the `producer.type` value of the
producer properties, ProducerConfig show the warning:
`The configuration 'producer.type' was supplied but
isn't a known config.`

Author: Shun Takebayashi <shun@takebayashi.asia>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2676 from takebayashi/suppress-mirrormaker-warning


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca2979f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca2979f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca2979f8

Branch: refs/heads/trunk
Commit: ca2979f847ead39b4480f31085c5cf26bb102080
Parents: b9b2cfc
Author: Shun Takebayashi <shun@takebayashi.asia>
Authored: Tue Apr 4 00:16:30 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Apr 4 00:16:35 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala            | 8 ++++----
 .../integration/kafka/tools/MirrorMakerIntegrationTest.scala | 3 +--
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2979f8/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d359c1a..5d88b4e 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -225,6 +225,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
       // create producer
       val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+      val sync = producerProps.getProperty("producer.type", "async").equals("sync")
+      producerProps.remove("producer.type")
       // Defaults to no data loss settings.
       maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
       maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
@@ -233,7 +235,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       // Always set producer key and value serializer to ByteArraySerializer.
       producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
       producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-      producer = new MirrorMakerProducer(producerProps)
+      producer = new MirrorMakerProducer(sync, producerProps)
 
       // Create consumers
       val mirrorMakerConsumers = if (useOldConsumer) {
@@ -696,9 +698,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
   }
 
-  private[tools] class MirrorMakerProducer(val producerProps: Properties) {
-
-    val sync = producerProps.getProperty("producer.type", "async").equals("sync")
+  private[tools] class MirrorMakerProducer(val sync: Boolean, val producerProps: Properties)
{
 
     val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2979f8/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 465e8de..b7b1a12 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -41,10 +41,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
     val producerProps = new Properties
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put("producer.type", "sync")
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
-    val producer = new MirrorMakerProducer(producerProps)
+    val producer = new MirrorMakerProducer(true, producerProps)
     MirrorMaker.producer = producer
     MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes()))
     MirrorMaker.producer.close()


Mime
View raw message