kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Fix ambiguous log message in RecordCollector
Date Thu, 30 Jun 2016 18:12:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7d6b416f9 -> 3605ffa30


MINOR: Fix ambiguous log message in RecordCollector

When producing fails in Kafka Streams, it gives an error like below:

```
Error sending record: null
```

by this line: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L59

This isn't not making sense because of:
- Practically metadata is always null when exception != null : https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java#L107-L109
- It's quite misleading as we would interpret it like "Kafka Streams attempted to send 'null'
as a record" which isn't in fact

As I find a PR #873  as the origin of the above line I changed it to instantiate callback
on each send in order to log destination topic at least.

Author: Yuto Kawamura <kawamuray.dadada@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #1555 from kawamuray/MINOR-record-collector-log-message


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3605ffa3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3605ffa3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3605ffa3

Branch: refs/heads/trunk
Commit: 3605ffa30602ebf9373b47f92f2a9f399a4a36db
Parents: 7d6b416
Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
Authored: Thu Jun 30 11:12:34 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 30 11:12:34 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/RecordCollector.java    | 31 ++++++++++----------
 1 file changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3605ffa3/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 07f9a1f..fea616f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -49,18 +49,6 @@ public class RecordCollector {
 
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
-    private final Callback callback = new Callback() {
-        @Override
-        public void onCompletion(RecordMetadata metadata, Exception exception) {
-            if (exception == null) {
-                TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                offsets.put(tp, metadata.offset());
-            } else {
-                log.error("Error sending record: " + metadata, exception);
-            }
-        }
-    };
-
 
     public RecordCollector(Producer<byte[], byte[]> producer) {
         this.producer = producer;
@@ -81,9 +69,22 @@ public class RecordCollector {
             if (partitions != null)
                 partition = partitioner.partition(record.key(), record.value(), partitions.size());
         }
-        this.producer.send(new ProducerRecord<>(record.topic(), partition, record.timestamp(),
-                                                keyBytes,
-                                                valBytes), callback);
+
+        ProducerRecord<byte[], byte[]> serializedRecord =
+                new ProducerRecord<>(record.topic(), partition, record.timestamp(),
keyBytes, valBytes);
+        final String topic = serializedRecord.topic();
+
+        this.producer.send(serializedRecord, new Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                if (exception == null) {
+                    TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                    offsets.put(tp, metadata.offset());
+                } else {
+                    log.error("Error sending record to topic {}", topic, exception);
+                }
+            }
+        });
     }
 
     public void flush() {


Mime
View raw message