kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3303; Pass partial record metadata to ProducerInterceptor.onAcknowledgement on error
Date Thu, 17 Mar 2016 00:29:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 858047a12 -> 9a836d015


KAFKA-3303; Pass partial record metadata to ProducerInterceptor.onAcknowledgement on error

This is a KIP-42 followup.

Currently, If sending the record fails before it gets to the server, ProducerInterceptor.onAcknowledgement()
is called with metadata == null, and non-null exception. However, it is useful to pass topic
and partition, if known, to ProducerInterceptor.onAcknowledgement() as well. This patch ensures
that  ProducerInterceptor.onAcknowledgement()  gets record metadata with topic and maybe partition.
If partition is not set in 'record' and KafkaProducer.send() fails before partition gets assigned,
then ProducerInterceptor.onAcknowledgement() gets RecordMetadata with partition == -1. Only
time when  ProducerInterceptor.onAcknowledgement() gets null record metadata is when the client
passes null record to KafkaProducer.send().

Author: Anna Povzner <anna@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ashish Singh <asingh@cloudera.com>,
Jun Rao <junrao@gmail.com>

Closes #1015 from apovzner/kip42-3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9a836d01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9a836d01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9a836d01

Branch: refs/heads/trunk
Commit: 9a836d0154efe6ea1effc688567186cb56265bf4
Parents: 858047a
Author: Anna Povzner <anna@confluent.io>
Authored: Wed Mar 16 17:29:29 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Mar 16 17:29:29 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 36 ++++++++----
 .../clients/producer/ProducerInterceptor.java   |  7 ++-
 .../kafka/clients/producer/RecordMetadata.java  |  5 ++
 .../internals/ProducerInterceptors.java         | 35 +++++++++++-
 .../internals/ProducerInterceptorsTest.java     | 58 ++++++++++++++++++++
 .../kafka/test/MockProducerInterceptor.java     |  9 ++-
 .../kafka/api/PlaintextConsumerTest.scala       |  5 +-
 7 files changed, 138 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c87973a..6acc059 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -427,9 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
         // intercept the record, which can be potentially modified; this method does not
throw exceptions
         ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record
: this.interceptors.onSend(record);
-        // producer callback will make sure to call both 'callback' and interceptor callback
-        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback,
this.interceptors);
-        return doSend(interceptedRecord, interceptCallback);
+        return doSend(interceptedRecord, callback);
     }
 
     /**
@@ -437,6 +435,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      * See {@link #send(ProducerRecord, Callback)} for details.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback
callback) {
+        TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available
             long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
@@ -460,10 +459,12 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey,
serializedValue);
             ensureValidRecordSize(serializedSize);
-            TopicPartition tp = new TopicPartition(record.topic(), partition);
+            tp = new TopicPartition(record.topic(), partition);
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record,
callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp,
serializedKey, serializedValue, callback, remainingWaitMs);
+            // producer callback will make sure to call both 'callback' and interceptor callback
+            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback,
this.interceptors, tp);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp,
serializedKey, serializedValue, interceptCallback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full
or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -477,27 +478,29 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             if (callback != null)
                 callback.onCompletion(null, e);
             this.errors.record();
+            if (this.interceptors != null)
+                this.interceptors.onSendError(record, tp, e);
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw new InterruptException(e);
         } catch (BufferExhaustedException e) {
             this.errors.record();
             this.metrics.sensor("buffer-exhausted-records").record();
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw e;
         } catch (KafkaException e) {
             this.errors.record();
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw e;
         } catch (Exception e) {
             // we notify interceptor about all exceptions, since onSend is called before
anything else in this method
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw e;
         }
     }
@@ -763,15 +766,24 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     private static class InterceptorCallback<K, V> implements Callback {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
+        private final TopicPartition tp;
 
-        public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V>
interceptors) {
+        public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V>
interceptors,
+                                   TopicPartition tp) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
+            this.tp = tp;
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) {
-            if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(metadata, exception);
+            if (this.interceptors != null) {
+                if (metadata == null) {
+                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, Record.NO_TIMESTAMP,
-1, -1, -1),
+                                                        exception);
+                } else {
+                    this.interceptors.onAcknowledgement(metadata, exception);
+                }
+            }
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
index aa18fdc..e835a69 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -76,7 +76,12 @@ public interface ProducerInterceptor<K, V> extends Configurable {
      * This method will generally execute in the background I/O thread, so the implementation
should be reasonably fast.
      * Otherwise, sending of messages from other threads could be delayed.
      *
-     * @param metadata The metadata for the record that was sent (i.e. the partition and
offset). Null if an error occurred.
+     * @param metadata The metadata for the record that was sent (i.e. the partition and
offset).
+     *                 If an error occurred, metadata will contain only valid topic and maybe
+     *                 partition. If partition is not given in ProducerRecord and an error
occurs
+     *                 before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
+     *                 The metadata may be null if the client passed null record to
+     *                 {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. Null if no
error occurred.
      */
     public void onAcknowledgement(RecordMetadata metadata, Exception exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index c60a53d..988da16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -23,6 +23,11 @@ import org.apache.kafka.common.TopicPartition;
  */
 public final class RecordMetadata {
 
+    /**
+     * Partition value for record without partition assigned
+     */
+    public static final int UNKNOWN_PARTITION = -1;
+
     private final long offset;
     // The timestamp of the message.
     // If LogAppendTime is used for the topic, the timestamp will be the timestamp returned
by the broker.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 9343a2e..8466d3a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -20,6 +20,8 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.producer.ProducerInterceptor;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +78,8 @@ public class ProducerInterceptors<K, V> implements Closeable {
      *
      * This method does not throw exceptions. Exceptions thrown by any of interceptor methods
are caught and ignored.
      *
-     * @param metadata The metadata for the record that was sent (i.e. the partition and
offset). Null if an error occurred.
+     * @param metadata The metadata for the record that was sent (i.e. the partition and
offset).
+     *                 If an error occurred, metadata will only contain valid topic and maybe
partition.
      * @param exception The exception thrown during processing of this record. Null if no
error occurred.
      */
     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
@@ -91,6 +94,36 @@ public class ProducerInterceptors<K, V> implements Closeable {
     }
 
     /**
+     * This method is called when sending the record fails in {@link ProducerInterceptor#onSend
+     * (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata,
Exception)}
+     * method for each interceptor
+     *
+     * @param record The record from client
+     * @param interceptTopicPartition  The topic/partition for the record if an error occurred
+     *        after partition gets assigned; the topic part of interceptTopicPartition is
the same as in record.
+     * @param exception The exception thrown during processing of this record.
+     */
+    public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition,
Exception exception) {
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                if (record == null && interceptTopicPartition == null) {
+                    interceptor.onAcknowledgement(null, exception);
+                } else {
+                    if (interceptTopicPartition == null) {
+                        interceptTopicPartition = new TopicPartition(record.topic(),
+                                                                     record.partition() ==
null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
+                    }
+                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition,
-1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
+                                                  exception);
+                }
+            } catch (Exception e) {
+                // do not propagate interceptor exceptions, just log
+                log.warn("Error executing interceptor onAcknowledgement callback", e);
+            }
+        }
+    }
+
+    /**
      * Closes every interceptor in a container.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 5a32dda..2135eb2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -30,6 +30,9 @@ public class ProducerInterceptorsTest {
     private final TopicPartition tp = new TopicPartition("test", 0);
     private final ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("test",
0, 1, "value");
     private int onAckCount = 0;
+    private int onErrorAckCount = 0;
+    private int onErrorAckWithTopicSetCount = 0;
+    private int onErrorAckWithTopicPartitionSetCount = 0;
     private int onSendCount = 0;
 
     private class AppendProducerInterceptor implements ProducerInterceptor<Integer, String>
{
@@ -59,6 +62,16 @@ public class ProducerInterceptorsTest {
         @Override
         public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
             onAckCount++;
+            if (exception != null) {
+                onErrorAckCount++;
+                // the length check is just to call topic() method and let it throw an exception
+                // if RecordMetadata.TopicPartition is null
+                if (metadata != null && metadata.topic().length() >= 0) {
+                    onErrorAckWithTopicSetCount++;
+                    if (metadata.partition() >= 0)
+                        onErrorAckWithTopicPartitionSetCount++;
+                }
+            }
             if (throwExceptionOnAck)
                 throw new KafkaException("Injected exception in AppendProducerInterceptor.onAcknowledgement");
         }
@@ -143,5 +156,50 @@ public class ProducerInterceptorsTest {
 
         interceptors.close();
     }
+
+    @Test
+    public void testOnAcknowledgementWithErrorChain() {
+        List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
+        AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
+        interceptorList.add(interceptor1);
+        ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
+
+        // verify that metadata contains both topic and partition
+        interceptors.onSendError(producerRecord,
+                                 new TopicPartition(producerRecord.topic(), producerRecord.partition()),
+                                 new KafkaException("Test"));
+        assertEquals(1, onErrorAckCount);
+        assertEquals(1, onErrorAckWithTopicPartitionSetCount);
+
+        // verify that metadata contains both topic and partition (because record already
contains partition)
+        interceptors.onSendError(producerRecord, null, new KafkaException("Test"));
+        assertEquals(2, onErrorAckCount);
+        assertEquals(2, onErrorAckWithTopicPartitionSetCount);
+
+        // if producer record does not contain partition, interceptor should get partition
== -1
+        ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2",
null, 1, "value");
+        interceptors.onSendError(record2, null, new KafkaException("Test"));
+        assertEquals(3, onErrorAckCount);
+        assertEquals(3, onErrorAckWithTopicSetCount);
+        assertEquals(2, onErrorAckWithTopicPartitionSetCount);
+
+        // if producer record does not contain partition, but topic/partition is passed to
+        // onSendError, then interceptor should get valid partition
+        int reassignedPartition = producerRecord.partition() + 1;
+        interceptors.onSendError(record2,
+                                 new TopicPartition(record2.topic(), reassignedPartition),
+                                 new KafkaException("Test"));
+        assertEquals(4, onErrorAckCount);
+        assertEquals(4, onErrorAckWithTopicSetCount);
+        assertEquals(3, onErrorAckWithTopicPartitionSetCount);
+
+        // if both record and topic/partition are null, interceptor should not receive metadata
+        interceptors.onSendError(null, null, new KafkaException("Test"));
+        assertEquals(5, onErrorAckCount);
+        assertEquals(4, onErrorAckWithTopicSetCount);
+        assertEquals(3, onErrorAckWithTopicPartitionSetCount);
+
+        interceptors.close();
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index cee1247..9e4d0de 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -32,6 +32,7 @@ public class MockProducerInterceptor implements ProducerInterceptor<String,
Stri
     public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0);
     public static final String APPEND_STRING_PROP = "mock.interceptor.append";
     private String appendStr;
 
@@ -64,9 +65,12 @@ public class MockProducerInterceptor implements ProducerInterceptor<String,
Stri
 
     @Override
     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
-        if (exception != null)
+        if (exception != null) {
             ON_ERROR_COUNT.incrementAndGet();
-        else if (metadata != null)
+            if (metadata != null) {
+                ON_ERROR_WITH_METADATA_COUNT.incrementAndGet();
+            }
+        } else if (metadata != null)
             ON_SUCCESS_COUNT.incrementAndGet();
     }
 
@@ -81,5 +85,6 @@ public class MockProducerInterceptor implements ProducerInterceptor<String,
Stri
         ONSEND_COUNT.set(0);
         ON_SUCCESS_COUNT.set(0);
         ON_ERROR_COUNT.set(0);
+        ON_ERROR_WITH_METADATA_COUNT.set(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9bdbf6d..8014479 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -568,7 +568,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       testProducer.send(null, null)
       fail("Should not allow sending a null record")
     } catch {
-      case e: Throwable => assertEquals("Interceptor should be notified about exception",
1, MockProducerInterceptor.ON_ERROR_COUNT.intValue()) // this is ok
+      case e: Throwable => {
+        assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue())
+        assertEquals("Interceptor should not receive metadata with an exception when record
is null", 0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
+      }
     }
 
     // create consumer with interceptor


Mime
View raw message