kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer capacity, even when intended payload is smaller; reviewed by Neha Narkhede
Date Tue, 23 Apr 2013 02:20:03 GMT
Updated Branches:
  refs/heads/0.8 f1d2141ca -> 1976ce806


KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer capacity, even when
intended payload is smaller; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1976ce80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1976ce80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1976ce80

Branch: refs/heads/0.8
Commit: 1976ce8069818c2c6d093b5b75e8f7b6653120bc
Parents: f1d2141
Author: David Stein <dstein205@gmail.com>
Authored: Mon Apr 22 19:19:13 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Apr 22 19:20:01 2013 -0700

----------------------------------------------------------------------
 .../kafka/bridge/hadoop/KafkaRecordWriter.java     |    5 ++++-
 1 files changed, 4 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1976ce80/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index a381ccd..6eea635 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -17,6 +17,7 @@
 package kafka.bridge.hadoop;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import kafka.javaapi.producer.Producer;
@@ -62,7 +63,9 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
     if (value instanceof byte[])
       valBytes = (byte[]) value;
     else if (value instanceof BytesWritable)
-      valBytes = ((BytesWritable) value).getBytes();
+      // BytesWritable.getBytes returns its internal buffer, so .length would refer to its
capacity, not the
+      // intended size of the byte array contained.  We need to use BytesWritable.getLength
for the true size.
+      valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), ((BytesWritable) value).getLength());
     else
       throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
 


Mime
View raw message