kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1393731 - in /incubator/kafka/branches/0.8: config/producer.properties core/src/main/scala/kafka/message/CompressionCodec.scala core/src/main/scala/kafka/producer/ProducerConfig.scala
Date Wed, 03 Oct 2012 19:50:13 GMT
Author: jjkoshy
Date: Wed Oct  3 19:50:13 2012
New Revision: 1393731

URL: http://svn.apache.org/viewvc?rev=1393731&view=rev
Log:
Replace numerical compression codes in config with something human readable; KAFKA-363; patched
by David Arthur; reviewed by Joel Koshy

Modified:
    incubator/kafka/branches/0.8/config/producer.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionCodec.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala

Modified: incubator/kafka/branches/0.8/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/producer.properties?rev=1393731&r1=1393730&r2=1393731&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/producer.properties (original)
+++ incubator/kafka/branches/0.8/config/producer.properties Wed Oct  3 19:50:13 2012
@@ -37,8 +37,9 @@ broker.list=localhost:9092
 # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
 producer.type=sync
 
-# specify the compression codec for all data generated: 0: no compression, 1: gzip
-compression.codec=0
+# specify the compression codec for all data generated: none , gzip, snappy.
+# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
+compression.codec=none
 
 # message encoder
 serializer.class=kafka.serializer.StringEncoder

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionCodec.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionCodec.scala?rev=1393731&r1=1393730&r2=1393731&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionCodec.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionCodec.scala
Wed Oct  3 19:50:13 2012
@@ -26,14 +26,34 @@ object CompressionCodec {
       case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression
codec".format(codec))
     }
   }
+  def getCompressionCodec(name: String): CompressionCodec = {
+    name.toLowerCase match {
+      case NoCompressionCodec.name => NoCompressionCodec
+      case GZIPCompressionCodec.name => GZIPCompressionCodec
+      case SnappyCompressionCodec.name => SnappyCompressionCodec
+      case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression
codec".format(name))
+    }
+  }
 }
 
-sealed trait CompressionCodec { def codec: Int }
+sealed trait CompressionCodec { def codec: Int; def name: String }
 
-case object DefaultCompressionCodec extends CompressionCodec { val codec = GZIPCompressionCodec.codec
}
+case object DefaultCompressionCodec extends CompressionCodec {
+  val codec = GZIPCompressionCodec.codec
+  val name = GZIPCompressionCodec.name
+}
 
-case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 }
+case object GZIPCompressionCodec extends CompressionCodec {
+  val codec = 1
+  val name = "gzip"
+}
 
-case object SnappyCompressionCodec extends CompressionCodec { val codec = 2 }
+case object SnappyCompressionCodec extends CompressionCodec {
+  val codec = 2
+  val name = "snappy"
+}
 
-case object NoCompressionCodec extends CompressionCodec { val codec = 0 }
+case object NoCompressionCodec extends CompressionCodec {
+  val codec = 0
+  val name = "none"
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1393731&r1=1393730&r2=1393731&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed
Oct  3 19:50:13 2012
@@ -56,7 +56,16 @@ class ProducerConfig private (val props:
    * This parameter allows you to specify the compression codec for all data generated *
    * by this producer. The default is NoCompressionCodec
    */
-  val compressionCodec = CompressionCodec.getCompressionCodec(props.getInt("compression.codec",
NoCompressionCodec.codec))
+  val compressionCodec = {
+    val prop = props.getString("compression.codec", NoCompressionCodec.name)
+    try {
+      CompressionCodec.getCompressionCodec(prop.toInt)
+    }
+    catch {
+      case nfe: NumberFormatException =>
+        CompressionCodec.getCompressionCodec(prop)
+    }
+  }
 
   /** This parameter allows you to set whether compression should be turned *
    *  on for particular topics



Mime
View raw message