kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer capacity, even when intended payload is smaller; reviewed by Neha Narkhede
Date Tue, 23 Apr 2013 02:21:49 GMT
Updated Branches:
  refs/heads/0.7 4d7629dd7 -> 68c8434f6


KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer capacity, even when
intended payload is smaller; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68c8434f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68c8434f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68c8434f

Branch: refs/heads/0.7
Commit: 68c8434f61fc7ef1152db576b359ff88ec0a4de3
Parents: 4d7629d
Author: David Stein <dstein205@gmail.com>
Authored: Mon Apr 22 19:21:28 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Apr 22 19:21:42 2013 -0700

----------------------------------------------------------------------
 .../kafka/bridge/hadoop/KafkaRecordWriter.java     |    6 +++++-
 1 files changed, 5 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68c8434f/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index af9c650..405a898 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -18,6 +18,7 @@ package kafka.bridge.hadoop;
 
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import kafka.javaapi.producer.Producer;
@@ -56,7 +57,10 @@ public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<Nul
   @Override
   public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
   {
-    Message msg = new Message(value.getBytes());
+    // BytesWritable.getBytes returns its internal buffer, so .length would refer to its
capacity, not the
+    // intended size of the byte array contained.  We need to use BytesWritable.getLength
for the true size.
+    Message msg = new Message(Arrays.copyOf(value.getBytes(), value.getLength()));
+
     msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
     totalSize += msg.size();
 


Mime
View raw message