kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3273; MessageFormatter and MessageReader interfaces should be resilient to changes
Date Tue, 01 Mar 2016 02:53:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 74eff8a83 -> 695fdc69d


KAFKA-3273; MessageFormatter and MessageReader interfaces should be resilient to changes

* Change `MessageFormat.writeTo` to take a `ConsumerRecord`
* Change `MessageReader.readMessage()` to use `ProducerRecord`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #972 from ijuma/kafka-3273-message-formatter-and-reader-resilient


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/695fdc69
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/695fdc69
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/695fdc69

Branch: refs/heads/trunk
Commit: 695fdc69db6e080419bb05d624e91fa88d5c0a02
Parents: 74eff8a
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Feb 29 18:52:54 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Feb 29 18:52:54 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/common/MessageFormatter.scala   | 39 +++++++++++++
 .../main/scala/kafka/common/MessageReader.scala | 39 +++++++++++++
 .../coordinator/GroupMetadataManager.scala      | 60 +++++++++++---------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 45 +++++++--------
 .../scala/kafka/tools/ConsoleProducer.scala     | 40 +++++--------
 .../scala/kafka/tools/SimpleConsumerShell.scala | 31 +++++-----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  |  3 +-
 docs/upgrade.html                               | 11 +++-
 8 files changed, 176 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/common/MessageFormatter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageFormatter.scala b/core/src/main/scala/kafka/common/MessageFormatter.scala
new file mode 100644
index 0000000..ef3c723
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageFormatter.scala
@@ -0,0 +1,39 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+import java.io.PrintStream
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+/**
+  * Typical implementations of this interface convert a `ConsumerRecord` into a type that
can then be passed to
+  * a `PrintStream`.
+  *
+  * This is used by the `ConsoleConsumer`.
+  */
+trait MessageFormatter {
+
+  def init(props: Properties) {}
+
+  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream):
Unit
+
+  def close() {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/common/MessageReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageReader.scala b/core/src/main/scala/kafka/common/MessageReader.scala
new file mode 100644
index 0000000..56b55ce
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageReader.scala
@@ -0,0 +1,39 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+import java.io.InputStream
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.ProducerRecord
+
+/**
+  * Typical implementations of this interface convert data from an `InputStream` received
via `init` into a
+  * `ProducerRecord` instance on each invocation of `readMessage`.
+  *
+  * This is used by the `ConsoleProducer`.
+  */
+trait MessageReader {
+
+  def init(inputStream: InputStream, props: Properties) {}
+
+  def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
+
+  def close() {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index b3e1bc1..2c29172 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -22,23 +22,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.utils.CoreUtils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field}
+import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
 import org.apache.kafka.common.protocol.types.Type.STRING
 import org.apache.kafka.common.protocol.types.Type.INT32
 import org.apache.kafka.common.protocol.types.Type.INT64
 import org.apache.kafka.common.protocol.types.Type.BYTES
-import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.utils.Time
-
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import kafka.utils._
 import kafka.common._
 import kafka.message._
 import kafka.log.FileMessageSet
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
-import kafka.tools.MessageFormatter
+import kafka.common.MessageFormatter
 import kafka.server.ReplicaManager
 
 import scala.collection._
@@ -968,37 +967,46 @@ object GroupMetadataManager {
   // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics
to false.
   // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
when consuming __consumer_offsets)
   class OffsetsMessageFormatter extends MessageFormatter {
-    def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream) {
-      val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
-      // We ignore the timestamp of the message because GroupMetadataMessage has its own
timestamp.
-      // only print if the message is an offset record
-      if (formattedKey.isInstanceOf[OffsetKey]) {
-        val groupTopicPartition = formattedKey.asInstanceOf[OffsetKey].toString
-        val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
-        output.write(groupTopicPartition.getBytes)
-        output.write("::".getBytes)
-        output.write(formattedValue.getBytes)
-        output.write("\n".getBytes)
+    def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
{
+      Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach
{
+        // Only print if the message is an offset record.
+        // We ignore the timestamp of the message because GroupMetadataMessage has its own
timestamp.
+        case offsetKey: OffsetKey =>
+          val groupTopicPartition = offsetKey.key
+          val value = consumerRecord.value
+          val formattedValue =
+            if (value == null) "NULL"
+            else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+          output.write(groupTopicPartition.toString.getBytes)
+          output.write("::".getBytes)
+          output.write(formattedValue.getBytes)
+          output.write("\n".getBytes)
+        case _ => // no-op
       }
     }
   }
 
   // Formatter for use with tools to read group metadata history
   class GroupMetadataMessageFormatter extends MessageFormatter {
-    def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream) {
-      val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
-      // We ignore the timestamp of the message because GroupMetadataMessage has its own
timestamp.
-      // only print if the message is a group metadata record
-      if (formattedKey.isInstanceOf[GroupMetadataKey]) {
-        val groupId = formattedKey.asInstanceOf[GroupMetadataKey].key
-        val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId,
ByteBuffer.wrap(value)).toString
-        output.write(groupId.getBytes)
-        output.write("::".getBytes)
-        output.write(formattedValue.getBytes)
-        output.write("\n".getBytes)
+    def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
{
+      Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach
{
+        // Only print if the message is a group metadata record.
+        // We ignore the timestamp of the message because GroupMetadataMessage has its own
timestamp.
+        case groupMetadataKey: GroupMetadataKey =>
+          val groupId = groupMetadataKey.key
+          val value = consumerRecord.value
+          val formattedValue =
+            if (value == null) "NULL"
+            else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+          output.write(groupId.getBytes)
+          output.write("::".getBytes)
+          output.write(formattedValue.getBytes)
+          output.write("\n".getBytes)
+        case _ => // no-op
       }
     }
   }
+
 }
 
 case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7aee7ab..855025e 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -20,20 +20,21 @@ package kafka.tools
 import java.io.PrintStream
 import java.util.concurrent.CountDownLatch
 import java.util.{Properties, Random}
+
 import joptsimple._
-import kafka.common.StreamEndException
+import kafka.common.{MessageFormatter, StreamEndException}
 import kafka.consumer._
 import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
-import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.serialization.{Deserializer, ByteArrayDeserializer}
+import org.apache.kafka.common.serialization.Deserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Consumer that dumps messages to standard out.
@@ -126,7 +127,8 @@ object ConsoleConsumer extends Logging {
       }
       messageCount += 1
       try {
-        formatter.writeTo(msg.key, msg.value, msg.timestamp, msg.timestampType, System.out)
+        formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType,
+          msg.key, msg.value), System.out)
       } catch {
         case e: Throwable =>
           if (skipMessageOnError) {
@@ -285,7 +287,7 @@ object ConsoleConsumer extends Logging {
     val fromBeginning = options.has(resetBeginningOpt)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
+    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
     val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue
else -1
     val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue
else -1
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
@@ -310,9 +312,9 @@ object ConsoleConsumer extends Logging {
     }
 
     //Provide the consumer with a randomly assigned group id
-    if(!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-" + new Random().nextInt(100000))
-      groupIdPassed=false
+    if (!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
+      groupIdPassed = false
     }
 
     def tryParse(parser: OptionParser, args: Array[String]) = {
@@ -336,14 +338,6 @@ object ConsoleConsumer extends Logging {
   }
 }
 
-trait MessageFormatter{
-  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream)
-
-  def init(props: Properties) {}
-
-  def close() {}
-}
-
 class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printTimestamp = false
@@ -370,7 +364,7 @@ class DefaultMessageFormatter extends MessageFormatter {
       valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
   }
 
-  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream) {
+  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
{
 
     def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator:
Array[Byte]) {
       val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)
@@ -379,6 +373,8 @@ class DefaultMessageFormatter extends MessageFormatter {
       output.write(separator)
     }
 
+    import consumerRecord._
+
     if (printTimestamp) {
       if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
         output.write(s"$timestampType:$timestamp".getBytes)
@@ -386,6 +382,7 @@ class DefaultMessageFormatter extends MessageFormatter {
         output.write(s"NO_TIMESTAMP".getBytes)
       output.write(keySeparator)
     }
+
     if (printKey) write(keyDeserializer, key, keySeparator)
     write(valueDeserializer, value, lineSeparator)
   }
@@ -395,9 +392,10 @@ class LoggingMessageFormatter extends MessageFormatter   {
   private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
   val logger = Logger.getLogger(getClass().getName)
 
-  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream): Unit = {
-    defaultWriter.writeTo(key, value, timestamp, timestampType, output)
-    if(logger.isInfoEnabled)
+  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream):
Unit = {
+    import consumerRecord._
+    defaultWriter.writeTo(consumerRecord, output)
+    if (logger.isInfoEnabled)
       logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp,
" else ""} +
                   s"key:${if (key == null) "null" else new String(key)}, " +
                   s"value:${if (value == null) "null" else new String(value)}")
@@ -407,7 +405,7 @@ class LoggingMessageFormatter extends MessageFormatter   {
 class NoOpMessageFormatter extends MessageFormatter {
   override def init(props: Properties) {}
 
-  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream){}
+  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream){}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
@@ -421,7 +419,8 @@ class ChecksumMessageFormatter extends MessageFormatter {
       topicStr = ""
   }
 
-  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream) {
+  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
{
+    import consumerRecord._
     val chksum =
       if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
         new Message(value, key, timestamp, timestampType, NoCompressionCodec, 0, -1, Message.MagicValue_V1).checksum

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index bce819e..0116a96 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -20,14 +20,13 @@ package kafka.tools
 import kafka.common._
 import kafka.message._
 import kafka.serializer._
-import kafka.utils.{ToolsUtils, CommandLineUtils}
-import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage}
-
+import kafka.utils.{CommandLineUtils, ToolsUtils}
+import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
 
 import joptsimple._
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.utils.Utils
 
 object ConsoleProducer {
@@ -52,12 +51,12 @@ object ConsoleProducer {
           }
         })
 
-        var message: KeyedMessage[Array[Byte], Array[Byte]] = null
+        var message: ProducerRecord[Array[Byte], Array[Byte]] = null
         do {
           message = reader.readMessage()
-          if(message != null)
-            producer.send(message.topic, message.key, message.message)
-        } while(message != null)
+          if (message != null)
+            producer.send(message.topic, message.key, message.value)
+        } while (message != null)
     } catch {
       case e: joptsimple.OptionException =>
         System.err.println(e.getMessage)
@@ -285,12 +284,6 @@ object ConsoleProducer {
     val maxBlockMs = options.valueOf(maxBlockMsOpt)
   }
 
-  trait MessageReader {
-    def init(inputStream: InputStream, props: Properties) {}
-    def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
-    def close() {}
-  }
-
   class LineMessageReader extends MessageReader {
     var topic: String = null
     var reader: BufferedReader = null
@@ -301,11 +294,11 @@ object ConsoleProducer {
 
     override def init(inputStream: InputStream, props: Properties) {
       topic = props.getProperty("topic")
-      if(props.containsKey("parse.key"))
+      if (props.containsKey("parse.key"))
         parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
-      if(props.containsKey("key.separator"))
+      if (props.containsKey("key.separator"))
         keySeparator = props.getProperty("key.separator")
-      if(props.containsKey("ignore.error"))
+      if (props.containsKey("ignore.error"))
         ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
       reader = new BufferedReader(new InputStreamReader(inputStream))
     }
@@ -317,17 +310,14 @@ object ConsoleProducer {
         case (line, true) =>
           line.indexOf(keySeparator) match {
             case -1 =>
-              if(ignoreError)
-                new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
-              else
-                throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+              if (ignoreError) new ProducerRecord(topic, line.getBytes)
+              else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
             case n =>
-              new KeyedMessage[Array[Byte], Array[Byte]](topic,
-                             line.substring(0, n).getBytes,
-                             (if(n + keySeparator.size > line.size) "" else line.substring(n
+ keySeparator.size)).getBytes())
+              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n
+ keySeparator.size)).getBytes
+              new ProducerRecord(topic, line.substring(0, n).getBytes, value)
           }
         case (line, false) =>
-          new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
+          new ProducerRecord(topic, line.getBytes)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index dda9697..b4b68e0 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -21,10 +21,12 @@ import joptsimple._
 import kafka.utils._
 import kafka.consumer._
 import kafka.client.ClientUtils
-import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
+import kafka.api.{FetchRequestBuilder, OffsetRequest, Request}
 import kafka.cluster.BrokerEndPoint
+
 import scala.collection.JavaConversions._
-import kafka.common.TopicAndPartition
+import kafka.common.{MessageFormatter, TopicAndPartition}
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.utils.Utils
 
 /**
@@ -137,7 +139,7 @@ object SimpleConsumerShell extends Logging {
     // validating partition id
     val partitionsMetadata = topicsMetadata(0).partitionsMetadata
     val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
-    if(!partitionMetadataOpt.isDefined) {
+    if (!partitionMetadataOpt.isDefined) {
       System.err.println("Error: partition %d does not exist for topic %s".format(partitionId,
topic))
       System.exit(1)
     }
@@ -145,9 +147,9 @@ object SimpleConsumerShell extends Logging {
     // validating replica id and initializing target broker
     var fetchTargetBroker: BrokerEndPoint = null
     var replicaOpt: Option[BrokerEndPoint] = null
-    if(replicaId == UseLeaderReplica) {
+    if (replicaId == UseLeaderReplica) {
       replicaOpt = partitionMetadataOpt.get.leader
-      if(!replicaOpt.isDefined) {
+      if (!replicaOpt.isDefined) {
         System.err.println("Error: user specifies to fetch from leader for partition (%s,
%d) which has not been elected yet".format(topic, partitionId))
         System.exit(1)
       }
@@ -186,7 +188,7 @@ object SimpleConsumerShell extends Logging {
     }
 
     // initializing formatter
-    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    val formatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
     val replicaString = if(replicaId > 0) "leader" else "replica"
@@ -202,7 +204,7 @@ object SimpleConsumerShell extends Logging {
         var offset = startingOffset
         var numMessagesConsumed = 0
         try {
-          while(numMessagesConsumed < maxMessages) {
+          while (numMessagesConsumed < maxMessages) {
             val fetchRequest = fetchRequestBuilder
                     .addFetch(topic, partitionId, offset, fetchSize)
                     .build()
@@ -213,15 +215,16 @@ object SimpleConsumerShell extends Logging {
               return
             }
             debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
-            for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages))
{
+            for (messageAndOffset <- messageSet if numMessagesConsumed < maxMessages)
{
               try {
                 offset = messageAndOffset.nextOffset
-                if(printOffsets)
+                if (printOffsets)
                   System.out.println("next offset = " + offset)
                 val message = messageAndOffset.message
-                val key = if(message.hasKey) Utils.readBytes(message.key) else null
+                val key = if (message.hasKey) Utils.readBytes(message.key) else null
                 val value = if (message.isNull) null else Utils.readBytes(message.payload)
-                formatter.writeTo(key, value, message.timestamp, message.timestampType, System.out)
+                formatter.writeTo(new ConsumerRecord(topic, partitionId, offset, message.timestamp,
+                  message.timestampType, key, value), System.out)
                 numMessagesConsumed += 1
               } catch {
                 case e: Throwable =>
@@ -230,7 +233,7 @@ object SimpleConsumerShell extends Logging {
                   else
                     throw e
               }
-              if(System.out.checkError()) {
+              if (System.out.checkError()) {
                 // This means no one is listening to our output stream any more, time to
shutdown
                 System.err.println("Unable to write to standard out, closing consumer.")
                 formatter.close()
@@ -242,8 +245,8 @@ object SimpleConsumerShell extends Logging {
         } catch {
           case e: Throwable =>
             error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic,
partitionId, replicaId, offset), e)
-        }finally {
-          info("Consumed " + numMessagesConsumed + " messages")
+        } finally {
+          info(s"Consumed $numMessagesConsumed messages")
         }
       }
     }, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index de92a24..31b3211 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -19,6 +19,7 @@ package kafka.tools
 
 import java.io.FileOutputStream
 
+import kafka.common.MessageFormatter
 import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
 import kafka.utils.TestUtils
 import org.easymock.EasyMock
@@ -39,7 +40,7 @@ class ConsoleConsumerTest extends JUnitSuite {
 
     //Expectations
     val messageLimit: Int = 10
-    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
+    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
     EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit)
 
     EasyMock.replay(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/695fdc69/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3370822..863a6fa 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -65,9 +65,14 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading
     <li> Message format 0.10.0 has been introduced and it is used by default. It includes
a timestamp field in the messages and relative offsets are used for compressed messages. </li>
     <li> ProduceRequest/Response v2 has been introduced and it is used by default to
support message format 0.10.0 </li>
     <li> FetchRequest/Response v2 has been introduced and it is used by default to
support message format 0.10.0 </li>
-    <li> MessageFormatter interface was changed from <code>void writeTo(byte[]
key, byte[] value, PrintStream output)</code> to
-        <code>void writeTo(byte[] key, byte[] value, long timestamp, TimestampType
timestampType, PrintStream output)</code> </li>
-    <li> MirrorMakerMessageHandler no longer exposes <em>handle(record: MessageAndMetadata[Array[Byte],
Array[Byte]])</em> method as it was never called. </li>
+    <li> MessageFormatter interface was changed from <code>def writeTo(key: Array[Byte],
value: Array[Byte], output: PrintStream)</code> to
+        <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]],
output: PrintStream)</code> </li>
+    <li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte],
Array[Byte]]</code> to
+        <code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code>
</li>
+    </li>
+    <li> MessageFormatter's package was changed from <code>kafka.tools</code>
to <code>kafka.common</code> </li>
+    <li> MessageReader's package was changed from <code>kafka.tools</code>
to <code>kafka.common</code> </li>
+    <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record:
MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called.
</li>
 </ul>
 
 <h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X
to 0.9.0.0</a></h4>


Mime
View raw message