kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1997: Follow-up patch, hardcode key/value serializer in mirror maker to byte serializer.
Date Sat, 08 Aug 2015 06:41:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 68ad80f85 -> 1ed88f0eb


KAFKA-1997: Follow-up patch, hardcode key/value serializer in mirror maker to byte serializer.

Hardcode the key/value serializer to ByteArraySerializer according to Jun’s comments.

Author: Jiangjie Qin <jqin@jqin-ld1.linkedin.biz>

Reviewers: Guozhang Wang

Closes #120 from becketqin/KAFKA-1997 and squashes the following commits:

7f2e5a6 [Jiangjie Qin] KAFKA-1997: Follow-up patch, hardcode key/value serializer in mirror
maker to byte serializer.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ed88f0e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ed88f0e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ed88f0e

Branch: refs/heads/trunk
Commit: 1ed88f0eba860bce4be0269cdd4d38c2b57db7f7
Parents: 68ad80f
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Aug 7 23:42:42 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Aug 7 23:42:42 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ed88f0e/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 797b4bb..fbe0c83 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -18,8 +18,8 @@
 package kafka.tools
 
 import java.util
-import java.util.concurrent.{TimeUnit, CountDownLatch}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
@@ -29,7 +29,7 @@ import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
-import kafka.utils.{CommandLineUtils, Logging, CoreUtils}
+import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
 import org.apache.kafka.common.utils.Utils
@@ -185,6 +185,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
     maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"1")
+    // Always set producer key and value serializer to ByteArraySerializer.
+    producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producer = new MirrorMakerProducer(producerProps)
 
     // Create consumer connector


Mime
View raw message