kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [35/36] git commit: kafka-1035; Add message-send-max-retries and retry-backoff-ms options to console producer; patched by Rajasekar Elango; reviewed by Guaozhang Wang and Jun Rao
Date Wed, 11 Sep 2013 17:04:20 GMT
kafka-1035; Add message-send-max-retries and retry-backoff-ms options to console producer;
patched by Rajasekar Elango; reviewed by Guaozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da451217
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da451217
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da451217

Branch: refs/heads/trunk
Commit: da4512174b6f7c395ffe053a86d2c6bb19d2538a
Parents: 20953b5
Author: Rajasekar Elango <e.rajasekar@gmail.com>
Authored: Thu Sep 5 07:45:01 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Sep 5 07:45:01 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/producer/ConsoleProducer.scala    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da451217/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 5539bce..00cb2e8 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -44,6 +44,14 @@ object ConsoleProducer {
                              .describedAs("size")
                              .ofType(classOf[java.lang.Integer])
                              .defaultsTo(200)
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can
fail receiving the message for multiple reasons, and being unavailable transiently is just
one of them. This property specifies the number of retires before the producer give up and
drop this message.")
+                             .withRequiredArg
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(3)
+    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer
refreshes the metadata of relevant topics. Since leader election takes a bit of time, this
property specifies the amount of time that the producer waits before refreshing the metadata.")
+                             .withRequiredArg
+                             .ofType(classOf[java.lang.Long])
+                             .defaultsTo(100)
     val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in
asynchronous mode, this gives the maximum amount of time" + 
                                                    " a message will queue awaiting suffient
batch size. The value is given in ms.")
                                .withRequiredArg
@@ -97,7 +105,7 @@ object ConsoleProducer {
                             .withRequiredArg
                             .describedAs("prop")
                             .ofType(classOf[String])
-
+                            
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerListOpt)) {
@@ -132,6 +140,9 @@ object ConsoleProducer {
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
       props.put("batch.num.messages", batchSize.toString)
+    
+    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
+    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
     props.put("queue.buffering.max.ms", sendTimeout.toString)
     props.put("queue.buffering.max.messages", queueSize.toString)
     props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)


Mime
View raw message