kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1499; Broker-side compression configuration; reviewed by Joel Koshy
Date Thu, 15 Jan 2015 02:46:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a61117840 -> 1c8f89bc7


KAFKA-1499; Broker-side compression configuration; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c8f89bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c8f89bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c8f89bc

Branch: refs/heads/trunk
Commit: 1c8f89bc73ec1844371c812215b255db037e24a5
Parents: a611178
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Wed Jan 14 15:16:53 2015 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Jan 14 18:46:38 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 23 +++---
 core/src/main/scala/kafka/log/LogConfig.scala   | 20 ++++-
 .../kafka/message/ByteBufferMessageSet.scala    | 14 ++--
 .../scala/kafka/message/CompressionCodec.scala  | 39 +++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 11 +++
 .../main/scala/kafka/server/KafkaServer.scala   |  3 +-
 .../test/scala/kafka/log/LogConfigTest.scala    |  2 +
 .../unit/kafka/log/BrokerCompressionTest.scala  | 84 ++++++++++++++++++++
 .../message/ByteBufferMessageSetTest.scala      |  6 +-
 .../unit/kafka/server/KafkaConfigTest.scala     | 28 ++++++-
 10 files changed, 201 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 024506c..86422bf 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConversions
 import com.yammer.metrics.core.Gauge
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false)
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, NoCompressionCodec,
-1, -1, false)
 }
 
 /**
@@ -41,10 +41,11 @@ object LogAppendInfo {
  * @param lastOffset The last offset in the message set
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
- * @param codec The codec used in the message set
+ * @param sourceCodec The source codec used in the message set(coming from producer)
+ * @param targetCodec The target codec of the message set(after applying broker compression
logic)
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
  */
-case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec,
shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
+case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec,
targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
 
 
 /**
@@ -287,7 +288,7 @@ class Log(val dir: File,
           // assign offsets to the message set
           val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
           try {
-            validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
+            validMessages = validMessages.assignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages
while appending to log '%s'".format(name), e)
           }
@@ -360,7 +361,7 @@ class Log(val dir: File,
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset, lastOffset = -1L
-    var codec: CompressionCodec = NoCompressionCodec
+    var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
     for(messageAndOffset <- messages.shallowIterator) {
       // update the first offset if on the first message
@@ -388,14 +389,18 @@ class Log(val dir: File,
 
       shallowMessageCount += 1
       validBytesCount += messageSize
-      
+
       val messageCodec = m.compressionCodec
       if(messageCodec != NoCompressionCodec)
-        codec = messageCodec
+        sourceCodec = messageCodec
     }
-    LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic)
+
+    //Apply if any broker-side compression
+    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType,
sourceCodec)
+    
+    LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount,
validBytesCount, monotonic)
   }
-  
+
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
    * @param messages The message set to trim

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index ca7a99e..2338b44 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -19,9 +19,11 @@ package kafka.log
 
 import java.util.Properties
 import org.apache.kafka.common.utils.Utils
-
 import scala.collection._
 import org.apache.kafka.common.config.ConfigDef
+import kafka.common._
+import scala.collection.JavaConversions._
+import kafka.message.BrokerCompressionCodec
 
 object Defaults {
   val SegmentSize = 1024 * 1024
@@ -40,6 +42,7 @@ object Defaults {
   val Compact = false
   val UncleanLeaderElectionEnable = true
   val MinInSyncReplicas = 1
+  val CompressionType = "producer"
 }
 
 /**
@@ -59,6 +62,7 @@ object Defaults {
  * @param compact Should old segments in this log be deleted or deduplicated?
  * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
  * @param minInSyncReplicas If number of insync replicas drops below this number, we stop
accepting writes with -1 (or all) required acks
+ * @param compressionType compressionType for a given topic
  *
  */
 case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
@@ -76,7 +80,8 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
                      val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
                      val compact: Boolean = Defaults.Compact,
                      val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
-                     val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) {
+                     val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
+                     val compressionType: String = Defaults.CompressionType) {
 
   def toProps: Properties = {
     val props = new Properties()
@@ -97,6 +102,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
     props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
     props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
     props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
+    props.put(CompressionTypeProp, compressionType)
     props
   }
 
@@ -125,6 +131,7 @@ object LogConfig {
   val CleanupPolicyProp = "cleanup.policy"
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
+  val CompressionTypeProp = "compression.type"
 
   val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
   val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is
rolled"
@@ -145,6 +152,10 @@ object LogConfig {
   val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
   val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop
accepting writes with" +
     " -1 (or all) required acks"
+  val CompressionTypeDoc = "This parameter allows you to specify the compression logic for
a given topic. This config" +
+    " is used to retain/remove/change the compression set by the producer. This config takes
the following options: " +
+    " uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what
the producer sets, the broker" +
+    " writes the message decompressed. producer means the broker attempts to retain whatever
is used by the producer"
 
   private val configDef = {
     import ConfigDef.Range._
@@ -174,6 +185,7 @@ object LogConfig {
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
         MEDIUM, UncleanLeaderElectionEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM,
MinInSyncReplicasDoc)
+      .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(seqAsJavaList(BrokerCompressionCodec.brokerCompressionOptions)),
MEDIUM, CompressionTypeDoc)
   }
 
   def configNames() = {
@@ -181,6 +193,7 @@ object LogConfig {
     configDef.names().toList.sorted
   }
 
+
   /**
    * Parse the given properties instance into a LogConfig object
    */
@@ -202,7 +215,8 @@ object LogConfig {
                   minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
                   compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase
!= Delete,
                   uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
-                  minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
+                  minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
+                  compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 788c786..f46ad5c 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -194,16 +194,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet
with Loggi
             innerIter.next
         }
       }
-      
+
     }
   }
-  
+
   /**
    * Update the offsets for this message set. This method attempts to do an in-place conversion
    * if there is no compression, but otherwise recopies the messages
    */
-  private[kafka] def assignOffsets(offsetCounter: AtomicLong, codec: CompressionCodec): ByteBufferMessageSet
= {
-    if(codec == NoCompressionCodec) {
+  private[kafka] def assignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec,
targetCodec: CompressionCodec): ByteBufferMessageSet = {
+    if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // do an in-place conversion
       var position = 0
       buffer.mark()
@@ -217,16 +217,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet
with Loggi
     } else {
       // messages are compressed, crack open the messageset and recompress with correct offset
       val messages = this.internalIterator(isShallow = false).map(_.message)
-      new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = offsetCounter, messages
= messages.toBuffer:_*)
+      new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter,
messages = messages.toBuffer:_*)
     }
   }
- 
+
 
   /**
    * The total number of bytes in this message set, including any partial trailing messages
    */
   def sizeInBytes: Int = buffer.limit
-  
+
   /**
    * The total number of bytes in this message set not including any partial, trailing messages
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index 9439d2b..cacde9b 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -38,29 +38,58 @@ object CompressionCodec {
   }
 }
 
+object BrokerCompressionCodec {
+
+  val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec,
GZIPCompressionCodec, ProducerCompressionCodec)
+  val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
+
+  def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains((compressionType.toLowerCase()))
+
+  def getCompressionCodec(compressionType: String): CompressionCodec = {
+    compressionType.toLowerCase match {
+      case UncompressedCodec.name => NoCompressionCodec
+      case _ => CompressionCodec.getCompressionCodec(compressionType)
+    }
+  }
+
+  def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec):
CompressionCodec = {
+    if (ProducerCompressionCodec.name.equals(compressionType)) producerCompression
+    else getCompressionCodec(compressionType)
+  }
+}
+
 sealed trait CompressionCodec { def codec: Int; def name: String }
+sealed trait BrokerCompressionCodec { def name: String }
 
-case object DefaultCompressionCodec extends CompressionCodec {
+case object DefaultCompressionCodec extends CompressionCodec with BrokerCompressionCodec
{
   val codec = GZIPCompressionCodec.codec
   val name = GZIPCompressionCodec.name
 }
 
-case object GZIPCompressionCodec extends CompressionCodec {
+case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
   val codec = 1
   val name = "gzip"
 }
 
-case object SnappyCompressionCodec extends CompressionCodec {
+case object SnappyCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
   val codec = 2
   val name = "snappy"
 }
 
-case object LZ4CompressionCodec extends CompressionCodec {
+case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionCodec {
   val codec = 3
   val name = "lz4"
 }
 
-case object NoCompressionCodec extends CompressionCodec {
+case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
   val codec = 0
   val name = "none"
 }
+
+case object UncompressedCodec extends BrokerCompressionCodec {
+  val name = "uncompressed"
+}
+
+case object ProducerCompressionCodec extends BrokerCompressionCodec {
+  val name = "producer"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index bbd3fd7..9d1adec 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,6 +21,8 @@ import java.util.Properties
 import kafka.message.{MessageSet, Message}
 import kafka.consumer.ConsumerConfig
 import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
+import kafka.message.NoCompressionCodec
+import kafka.message.BrokerCompressionCodec
 
 /**
  * Configuration settings for the kafka server
@@ -345,4 +347,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* Enables delete topic. Delete topic through the admin tool will have no effect if this
config is turned off */
   val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
 
+  /**
+   * This parameter allows you to specify the broker-side compression logic. This config
is used to
+   * retain/remove/change the compression set by the producer. This config takes the following
options:
+   * uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what
the producer sets, the broker
+   * writes the message decompressed. producer means the broker attempts to retain whatever
is used by the producer"
+  */
+  val compressionType = props.getString("compression.type", "producer").toLowerCase()
+  require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType
+ " is not valid." +
+      " Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a069eb9..89200da 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -329,7 +329,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                                      deleteRetentionMs = config.logCleanerDeleteRetentionMs,
                                      fileDeleteDelayMs = config.logDeleteDelayMs,
                                      minCleanableRatio = config.logCleanerMinCleanRatio,
-                                     compact = config.logCleanupPolicy.trim.toLowerCase ==
"compact")
+                                     compact = config.logCleanupPolicy.trim.toLowerCase ==
"compact",
+                                     compressionType = config.compressionType)
     val defaultProps = defaultLogConfig.toProps
     val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,
_))
     // read the log configurations from zookeeper

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
index 99b0df7..fe5bd9d 100644
--- a/core/src/test/scala/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/kafka/log/LogConfigTest.scala
@@ -34,6 +34,7 @@ class LogConfigTest extends JUnit3Suite {
 
     Assert.assertEquals(4242, config.segmentSize)
     Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
+    Assert.assertEquals("producer", config.compressionType)
   }
 
   @Test
@@ -50,6 +51,7 @@ class LogConfigTest extends JUnit3Suite {
     LogConfig.configNames().foreach((name) => {
       name match {
         case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true",
"false"))
+        case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer","uncompressed","gzip"))
         case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact,
LogConfig.Delete))
         case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble
* .9 + .1))
         case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue
- 1) + 1).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
new file mode 100644
index 0000000..fa4a8ad
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import kafka.utils._
+import kafka.message._
+import org.scalatest.junit.JUnitSuite
+import org.junit._
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.{ Collection, ArrayList }
+import kafka.server.KafkaConfig
+import org.apache.kafka.common.record.CompressionType
+import scala.collection.JavaConversions._
+
+@RunWith(value = classOf[Parameterized])
+class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends
JUnitSuite {
+
+  var logDir: File = null
+  val time = new MockTime(0)
+  val logConfig = LogConfig()
+
+  @Before
+  def setUp() {
+    logDir = TestUtils.tempDir()
+  }
+
+  @After
+  def tearDown() {
+    Utils.rm(logDir)
+  }
+
+  /**
+   * Test broker-side compression configuration
+   */
+  @Test
+  def testBrokerSideCompression() {
+    val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression)
+
+    /*configure broker-side compression  */
+    val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint
= 0L, time.scheduler, time = time)
+
+    /* append two messages */
+    log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes),
new Message("there".getBytes)))
+
+    def readMessage(offset: Int) = log.read(offset, 4096).messageSet.head.message
+
+    if (!brokerCompression.equals("producer")) {
+      val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)
+      assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name,
brokerCompressionCode, readMessage(0).compressionCodec)
+    }
+    else
+      assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name,
messageCompressionCode, readMessage(0).compressionCodec)
+
+  }
+
+}
+
+object BrokerCompressionTest {
+  @Parameters
+  def parameters: Collection[Array[String]] = {
+     for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions;
+         messageCompression <- CompressionType.values
+    ) yield Array(messageCompression.name, brokerCompression)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 4e45d96..716254a 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets 
     checkOffsets(messages, 0)
     var offset = 1234567
-    checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec), offset)
-    
+    checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec , NoCompressionCodec),
offset)
+
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec),
offset)
+    checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec
, DefaultCompressionCodec), offset)
   }
   
   /* check that offsets are assigned based on byte offset from the given base offset */

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c8f89bc/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2377abe..82dce80 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,6 +21,8 @@ import org.junit.Test
 import junit.framework.Assert._
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
+import kafka.message.GZIPCompressionCodec
+import kafka.message.NoCompressionCodec
 
 class KafkaConfigTest extends JUnit3Suite {
 
@@ -180,6 +182,30 @@ class KafkaConfigTest extends JUnit3Suite {
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis																						
		)
 
   }
-  
 
+  @Test
+  def testDefaultCompressionType() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    val serverConfig = new KafkaConfig(props)
+
+    assertEquals(serverConfig.compressionType, "producer")
+  }
+
+  @Test
+  def testValidCompressionType() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("compression.type", "gzip")
+    val serverConfig = new KafkaConfig(props)
+
+    assertEquals(serverConfig.compressionType, "gzip")
+  }
+
+  @Test
+  def testInvalidCompressionType() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("compression.type", "abc")
+    intercept[IllegalArgumentException] {
+      new KafkaConfig(props)
+    }
+  }
 }


Mime
View raw message