kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1202045 - in /incubator/kafka/trunk: core/src/main/scala/kafka/message/ core/src/test/scala/unit/kafka/message/ project/build/
Date Tue, 15 Nov 2011 05:02:09 GMT
Author: junrao
Date: Tue Nov 15 05:02:08 2011
New Revision: 1202045

URL: http://svn.apache.org/viewvc?rev=1202045&view=rev
Log:
Add Snappy Compression as a Codec; patched by Joe Stein; reviewed by Neha Narkhede and Jun
Rao; KAFKA-187

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
    incubator/kafka/trunk/project/build/KafkaProject.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala?rev=1202045&r1=1202044&r2=1202045&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala Tue Nov
15 05:02:08 2011
@@ -20,8 +20,9 @@ package kafka.message
 object CompressionCodec {
   def getCompressionCodec(codec: Int): CompressionCodec = {
     codec match {
-      case 0 => NoCompressionCodec
-      case 1 => GZIPCompressionCodec
+      case NoCompressionCodec.codec => NoCompressionCodec
+      case GZIPCompressionCodec.codec => GZIPCompressionCodec
+      case SnappyCompressionCodec.codec => SnappyCompressionCodec
       case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression
codec".format(codec))
     }
   }
@@ -29,8 +30,10 @@ object CompressionCodec {
 
 sealed trait CompressionCodec { def codec: Int }
 
-case object DefaultCompressionCodec extends CompressionCodec { val codec = 1 }
+case object DefaultCompressionCodec extends CompressionCodec { val codec = GZIPCompressionCodec.codec
}
 
 case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 }
 
+case object SnappyCompressionCodec extends CompressionCodec { val codec = 2 }
+
 case object NoCompressionCodec extends CompressionCodec { val codec = 0 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1202045&r1=1202044&r2=1202045&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Tue Nov
15 05:02:08 2011
@@ -20,125 +20,143 @@ package kafka.message
 import java.io.ByteArrayOutputStream
 import java.io.IOException
 import java.io.InputStream
-import java.util.zip.GZIPInputStream
-import java.util.zip.GZIPOutputStream
 import java.nio.ByteBuffer
 import org.apache.log4j.Logger
 
-object CompressionUtils {
-  private val logger = Logger.getLogger(getClass)
+abstract sealed class CompressionFacade(inputStream: InputStream, outputStream: ByteArrayOutputStream)
{
+  def close() = {
+    if (inputStream != null) inputStream.close()
+    if (outputStream != null) outputStream.close()
+  }	
+  def read(a: Array[Byte]): Int
+  def write(a: Array[Byte])
+}
 
-  def compress(messages: Iterable[Message]): Message = compress(messages, DefaultCompressionCodec)
+class GZIPCompression(inputStream: InputStream, outputStream: ByteArrayOutputStream)  extends
CompressionFacade(inputStream,outputStream) {
+  import java.util.zip.GZIPInputStream
+  import java.util.zip.GZIPOutputStream
+  val gzipIn:GZIPInputStream = if (inputStream == null) null else new  GZIPInputStream(inputStream)
+  val gzipOut:GZIPOutputStream = if (outputStream == null) null else new  GZIPOutputStream(outputStream)
+
+  override def close() {
+    if (gzipIn != null) gzipIn.close()
+    if (gzipOut != null) gzipOut.close()
+    super.close()	
+  }
 
-  def compress(messages: Iterable[Message], compressionCodec: CompressionCodec):Message =
compressionCodec match {
-    case DefaultCompressionCodec =>
-      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
-      val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
-      if(logger.isDebugEnabled)
-        logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
-
-      val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-      messages.foreach(m => m.serializeTo(messageByteBuffer))
-      messageByteBuffer.rewind
-
-      try {
-        gzipOutput.write(messageByteBuffer.array)
-      } catch {
-        case e: IOException => logger.error("Error while writing to the GZIP output stream",
e)
-        if(gzipOutput != null) gzipOutput.close();
-        if(outputStream != null) outputStream.close()
-        throw e
-      } finally {
-        if(gzipOutput != null) gzipOutput.close()
-        if(outputStream != null) outputStream.close()
-      }
+  override def write(a: Array[Byte]) = {
+    gzipOut.write(a)
+  }
 
-      val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
-      oneCompressedMessage
-    case GZIPCompressionCodec =>
-      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
-      val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
-      if(logger.isDebugEnabled)
-        logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
-
-      val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-      messages.foreach(m => m.serializeTo(messageByteBuffer))
-      messageByteBuffer.rewind
-
-      try {
-        gzipOutput.write(messageByteBuffer.array)
-      } catch {
-        case e: IOException => logger.error("Error while writing to the GZIP output stream",
e)
-        if(gzipOutput != null)
-          gzipOutput.close()
-        if(outputStream != null)
-          outputStream.close()
-        throw e
-      } finally {
-        if(gzipOutput != null)
-          gzipOutput.close()
-        if(outputStream != null)
-          outputStream.close()
-      }
+  override def read(a: Array[Byte]): Int = {
+    gzipIn.read(a)	
+  }
+}
+
+class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream)  extends
CompressionFacade(inputStream,outputStream) {
+  import org.xerial.snappy.{SnappyInputStream}
+  import org.xerial.snappy.{SnappyOutputStream}
+  
+  val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream)
+  val snappyOut:SnappyOutputStream = if (outputStream == null) null else new  SnappyOutputStream(outputStream)
+
+  override def close() = {
+    if (snappyIn != null) snappyIn.close()
+    if (snappyOut != null) snappyOut.close()
+    super.close()	
+  }
 
-      val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
-      oneCompressedMessage
+  override def write(a: Array[Byte]) = {
+    snappyOut.write(a)
+  }
+
+  override def read(a: Array[Byte]): Int = {
+    snappyIn.read(a)	
+  }
+
+}
+
+object CompressionFactory {
+  def apply(compressionCodec: CompressionCodec, stream: ByteArrayOutputStream): CompressionFacade
= compressionCodec match {
+    case GZIPCompressionCodec => new GZIPCompression(null,stream)
+    case SnappyCompressionCodec => new SnappyCompression(null,stream)
     case _ =>
       throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
   }
+  def apply(compressionCodec: CompressionCodec, stream: InputStream): CompressionFacade =
compressionCodec match {
+    case GZIPCompressionCodec => new GZIPCompression(stream,null)
+    case SnappyCompressionCodec => new SnappyCompression(stream,null)
+    case _ =>
+      throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
+  }
+}
 
-  def decompress(message: Message): ByteBufferMessageSet = message.compressionCodec match
{
-    case DefaultCompressionCodec =>
-      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
-      val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
-      val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
-      val intermediateBuffer = new Array[Byte](1024)
-
-      try {
-        Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach {
dataRead =>
-          outputStream.write(intermediateBuffer, 0, dataRead)
-        }
-      }catch {
-        case e: IOException => logger.error("Error while reading from the GZIP input stream",
e)
-        if(gzipIn != null) gzipIn.close
-        if(outputStream != null) outputStream.close
-        throw e
-      } finally {
-        if(gzipIn != null) gzipIn.close
-        if(outputStream != null) outputStream.close
-      }
+object CompressionUtils {
+  private val logger = Logger.getLogger(getClass)
 
-      val outputBuffer = ByteBuffer.allocate(outputStream.size)
-      outputBuffer.put(outputStream.toByteArray)
-      outputBuffer.rewind
-      val outputByteArray = outputStream.toByteArray
-      new ByteBufferMessageSet(outputBuffer)
-    case GZIPCompressionCodec =>
-      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
-      val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
-      val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
-      val intermediateBuffer = new Array[Byte](1024)
-
-      try {
-        Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach {
dataRead =>
-          outputStream.write(intermediateBuffer, 0, dataRead)
-        }
-      }catch {
-        case e: IOException => logger.error("Error while reading from the GZIP input stream",
e)
-        if(gzipIn != null) gzipIn.close
-        if(outputStream != null) outputStream.close
-        throw e
-      } finally {
-        if(gzipIn != null) gzipIn.close
-        if(outputStream != null) outputStream.close
-      }
+  //specify the codec which is the default when DefaultCompressionCodec is used
+  private var defaultCodec: CompressionCodec = GZIPCompressionCodec
 
-      val outputBuffer = ByteBuffer.allocate(outputStream.size)
-      outputBuffer.put(outputStream.toByteArray)
-      outputBuffer.rewind
-      val outputByteArray = outputStream.toByteArray
-      new ByteBufferMessageSet(outputBuffer)
-    case _ =>
-      throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec)
+  def compress(messages: Iterable[Message], compressionCodec: CompressionCodec = DefaultCompressionCodec):Message
= {
+	val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+	
+	if(logger.isDebugEnabled)
+	  logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+    var cf: CompressionFacade = null
+		
+	if (compressionCodec == DefaultCompressionCodec)
+      cf = CompressionFactory(defaultCodec,outputStream)
+    else 
+      cf = CompressionFactory(compressionCodec,outputStream) 
+
+    val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+    messages.foreach(m => m.serializeTo(messageByteBuffer))
+    messageByteBuffer.rewind
+
+    try {
+      cf.write(messageByteBuffer.array)
+    } catch {
+      case e: IOException => logger.error("Error while writing to the GZIP output stream",
e)
+      cf.close()
+      throw e
+    } finally {
+      cf.close()
+    }
+
+    val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+    oneCompressedMessage
+   }
+
+  def decompress(message: Message): ByteBufferMessageSet = {
+    val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+    val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+
+    val intermediateBuffer = new Array[Byte](1024)
+
+    var cf: CompressionFacade = null
+		
+	if (message.compressionCodec == DefaultCompressionCodec) 
+      cf = CompressionFactory(defaultCodec,inputStream)
+    else 
+      cf = CompressionFactory(message.compressionCodec,inputStream)
+
+    try {
+      Stream.continually(cf.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead
=>
+        outputStream.write(intermediateBuffer, 0, dataRead)
+      }
+    }catch {
+      case e: IOException => logger.error("Error while reading from the GZIP input stream",
e)
+      cf.close()
+      throw e
+    } finally {
+      cf.close()
+    }
+
+    val outputBuffer = ByteBuffer.allocate(outputStream.size)
+    outputBuffer.put(outputStream.toByteArray)
+    outputBuffer.rewind
+    val outputByteArray = outputStream.toByteArray
+    new ByteBufferMessageSet(outputBuffer)
   }
 }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala?rev=1202045&r1=1202044&r2=1202045&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
Tue Nov 15 05:02:08 2011
@@ -20,6 +20,7 @@ package kafka.message
 import kafka.utils.TestUtils
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
+import junit.framework.Assert._
 
 class CompressionUtilTest extends JUnitSuite {
 
@@ -55,4 +56,20 @@ class CompressionUtilTest extends JUnitS
 
     TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator))
   }
+
+  @Test
+  def testSnappyCompressDecompressExplicit() {
+
+    val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes),
new Message("I am not so well today".getBytes))
+
+    val message = CompressionUtils.compress(messages,SnappyCompressionCodec)
+
+    assertEquals(message.compressionCodec,SnappyCompressionCodec)
+
+    val decompressedMessages = CompressionUtils.decompress(message)
+
+    TestUtils.checkLength(decompressedMessages.iterator,3)
+
+    TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator))
+  }
 }

Modified: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1202045&r1=1202044&r2=1202045&view=diff
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (original)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Tue Nov 15 05:02:08 2011
@@ -42,7 +42,7 @@ class KafkaProject(info: ProjectInfo) ex
 
 
   class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info)
-     with IdeaProject with CoreDependencies with TestDependencies {
+     with IdeaProject with CoreDependencies with TestDependencies with CompressionDependencies
{
    val corePackageAction = packageAllAction
 
   //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which
required
@@ -225,5 +225,9 @@ class KafkaProject(info: ProjectInfo) ex
     val log4j = "log4j" % "log4j" % "1.2.15"
     val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
   }
+  
+  trait CompressionDependencies {
+    val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1"	
+  }
 
 }



Mime
View raw message