kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: resolve conflicts
Date Thu, 30 Jun 2016 18:16:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 4091a13c5 -> 3ff68ef6c


resolve conflicts


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3ff68ef6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3ff68ef6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3ff68ef6

Branch: refs/heads/0.10.0
Commit: 3ff68ef6c9da82bf4e69bb37ca2f1cd7403e59dd
Parents: 4091a13
Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
Authored: Thu Jun 30 11:12:34 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 30 11:15:54 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/RecordCollector.java    | 29 +++++++++++---------
 1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3ff68ef6/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index eb731be..fea616f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -49,18 +49,6 @@ public class RecordCollector {
 
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
-    private final Callback callback = new Callback() {
-        @Override
-        public void onCompletion(RecordMetadata metadata, Exception exception) {
-            if (exception == null) {
-                TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                offsets.put(tp, metadata.offset());
-            } else {
-                log.error("Error sending record: " + metadata, exception);
-            }
-        }
-    };
-
 
     public RecordCollector(Producer<byte[], byte[]> producer) {
         this.producer = producer;
@@ -81,7 +69,22 @@ public class RecordCollector {
             if (partitions != null)
                 partition = partitioner.partition(record.key(), record.value(), partitions.size());
         }
-        this.producer.send(new ProducerRecord<>(record.topic(), partition, keyBytes,
valBytes), callback);
+
+        ProducerRecord<byte[], byte[]> serializedRecord =
+                new ProducerRecord<>(record.topic(), partition, record.timestamp(),
keyBytes, valBytes);
+        final String topic = serializedRecord.topic();
+
+        this.producer.send(serializedRecord, new Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                if (exception == null) {
+                    TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                    offsets.put(tp, metadata.offset());
+                } else {
+                    log.error("Error sending record to topic {}", topic, exception);
+                }
+            }
+        });
     }
 
     public void flush() {


Mime
View raw message