kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1310595 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/message/ test/scala/unit/kafka/message/
Date Fri, 06 Apr 2012 21:18:57 GMT
Author: junrao
Date: Fri Apr  6 21:18:56 2012
New Revision: 1310595

URL: http://svn.apache.org/viewvc?rev=1310595&view=rev
Log:
enable shallow iterator in ByteBufferMessageSet to allow mirroing data without decompression;
patched by Jun Rao; reviewed by Joel Koshy; KAFKA-315

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1310595&r1=1310594&r2=1310595&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Fri Apr
 6 21:18:56 2012
@@ -105,5 +105,12 @@ class ConsumerConfig(props: Properties) 
 
   val mirrorConsumerNumThreads = Utils.getInt(
     props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
+
+  /** Use shallow iterator over compressed messages directly. This feature should be used
very carefully.
+   *  Typically, it's only used for mirroring raw messages from one kafka cluster to another
to save the
+   *  overhead of decompression.
+   *  */
+  val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
+
 }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1310595&r1=1310594&r2=1310595&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Fri Apr
 6 21:18:56 2012
@@ -31,7 +31,8 @@ import java.util.concurrent.atomic.Atomi
 class ConsumerIterator[T](private val topic: String,
                           private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
-                          private val decoder: Decoder[T])
+                          private val decoder: Decoder[T],
+                          val enableShallowIterator: Boolean)
   extends IteratorTemplate[T] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@@ -74,7 +75,8 @@ class ConsumerIterator[T](private val to
                         .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset,
currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
         }
-        localCurrent = currentDataChunk.messages.iterator
+        localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
+                       else currentDataChunk.messages.iterator
         current.set(localCurrent)
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1310595&r1=1310594&r2=1310595&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Fri
Apr  6 21:18:56 2012
@@ -27,11 +27,12 @@ import kafka.serializer.Decoder
 class KafkaMessageStream[T](val topic: String,
                             private val queue: BlockingQueue[FetchedDataChunk],
                             consumerTimeoutMs: Int,
-                            private val decoder: Decoder[T])
+                            private val decoder: Decoder[T],
+                            val enableShallowIterator: Boolean)
    extends Iterable[T] with java.lang.Iterable[T]{
 
   private val iter: ConsumerIterator[T] =
-    new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder)
+    new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder, enableShallowIterator)
     
   /**
    *  Create an iterator over messages in the stream.

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1310595&r1=1310594&r2=1310595&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Fri Apr  6 21:18:56 2012
@@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerCo
       for (threadId <- threadIdSet) {
         val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs,
decoder)
+        streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs,
decoder, config.enableShallowIterator)
       }
       ret += (topic -> streamList)
       debug("adding topic " + topic + " and stream to map..")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1310595&r1=1310594&r2=1310595&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri
Apr  6 21:18:56 2012
@@ -78,9 +78,12 @@ class ByteBufferMessageSet(private val b
     buffer.reset()
     written
   }
-  
+
+  /** default iterator that iterates over decompressed messages */
   override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
+  /** iterator over compressed messages without decompressing */
+  def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
 
   def verifyMessageSize(maxMessageSize: Int){
     var shallowIter = internalIterator(true)
@@ -124,6 +127,9 @@ class ByteBufferMessageSet(private val b
         message.limit(size)
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
+        if(!newMessage.isValid)
+          throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
+            + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
 
         if(isShallow){
           currValidBytes += 4 + size
@@ -133,16 +139,12 @@ class ByteBufferMessageSet(private val b
         else{
           newMessage.compressionCodec match {
             case NoCompressionCodec =>
-              if(!newMessage.isValid)
-                throw new InvalidMessageException("Uncompressed essage is invalid")
               debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
               innerIter = null
               currValidBytes += 4 + size
               trace("currValidBytes = " + currValidBytes)
               new MessageAndOffset(newMessage, currValidBytes)
             case _ =>
-              if(!newMessage.isValid)
-                throw new InvalidMessageException("Compressed message is invalid")
               debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
               innerIter = CompressionUtils.decompress(newMessage).internalIterator()
               if (!innerIter.hasNext) {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1310595&r1=1310594&r2=1310595&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
Fri Apr  6 21:18:56 2012
@@ -94,6 +94,10 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+
+      //make sure shallow iterator is the same as deep iterator
+      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
+                                     TestUtils.getMessageIterator(messageSet.iterator))
     }
 
     // test for compressed regular messages
@@ -104,6 +108,8 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+
+      verifyShallowIterator(messageSet)
     }
 
     // test for mixed empty and non-empty messagesets uncompressed
@@ -121,6 +127,10 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+
+      //make sure shallow iterator is the same as deep iterator
+      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
+                                     TestUtils.getMessageIterator(mixedMessageSet.iterator))
     }
 
     // test for mixed empty and non-empty messagesets compressed
@@ -138,7 +148,15 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+
+      verifyShallowIterator(mixedMessageSet)
     }
   }
 
+  def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
+      //make sure the offsets returned by a shallow iterator is a subset of that of a deep
iterator
+      val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
+      val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
+      assertTrue(shallowOffsets.subsetOf(deepOffsets))
+  }
 }



Mime
View raw message