kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
Date Sat, 21 Dec 2019 18:41:00 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 31a9f6a  KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
31a9f6a is described below

commit 31a9f6add18e8489597f4ba159f77b2d2bd0531a
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sat Dec 21 12:30:36 2019 -0600

    KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
    
    Reviewers:  Matthias J. Sax <mjsax@apache.org>
---
 .../processor/internals/RecordCollectorImpl.java   | 16 ++--
 .../processor/internals/RecordCollectorTest.java   | 90 +++++++++++++++++++---
 2 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d72115e..ea573f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.Collections;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,8 +34,8 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -45,6 +44,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -236,12 +236,10 @@ public class RecordCollectorImpl implements RecordCollector {
                 e
             );
         } catch (final RuntimeException uncaughtException) {
-            if (uncaughtException instanceof KafkaException &&
-                uncaughtException.getCause() instanceof ProducerFencedException) {
-                final KafkaException kafkaException = (KafkaException) uncaughtException;
+            if (isRecoverable(uncaughtException)) {
                 // producer.send() call may throw a KafkaException which wraps a FencedException,
                 // in this case we should throw its wrapped inner cause so that it can be
captured and re-wrapped as TaskMigrationException
-                throw new RecoverableClientException("Caught a wrapped ProducerFencedException",
kafkaException);
+                throw new RecoverableClientException("Caught a recoverable exception", uncaughtException);
             } else {
                 throw new StreamsException(
                     String.format(
@@ -257,6 +255,12 @@ public class RecordCollectorImpl implements RecordCollector {
         }
     }
 
+    public static boolean isRecoverable(final RuntimeException uncaughtException) {
+        return uncaughtException instanceof KafkaException && (
+            uncaughtException.getCause() instanceof ProducerFencedException ||
+                uncaughtException.getCause() instanceof UnknownProducerIdException);
+    }
+
     private void checkForException() {
         if (sendException != null) {
             throw sendException;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 72d315f..7bc616d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
@@ -53,6 +54,7 @@ import java.util.concurrent.Future;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -174,42 +176,112 @@ public class RecordCollectorTest {
         assertThat(collector.offsets().get(topicPartition), equalTo(2L));
     }
 
-    @SuppressWarnings("unchecked")
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
         final RecordCollector collector = new RecordCollectorImpl(
             "test",
             logContext,
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer,
byteArraySerializer) {
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer) {
             @Override
-            public synchronized Future<RecordMetadata> send(final ProducerRecord record,
final Callback callback) {
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[],
byte[]> record, final Callback callback) {
                 throw new KafkaException();
             }
         });
 
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner);
+        final StreamsException thrown = assertThrows(StreamsException.class, () ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(KafkaException.class));
     }
 
-    @SuppressWarnings("unchecked")
-    @Test(expected = RecoverableClientException.class)
+    @Test
+    public void shouldThrowRecoverableExceptionOnProducerFencedException() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("dropped-records")
+        );
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer) {
+            @Override
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[],
byte[]> record, final Callback callback) {
+                throw new KafkaException(new ProducerFencedException("asdf"));
+            }
+        });
+
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class,
() ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+        assertThat(thrown.getCause().getCause(), instanceOf(ProducerFencedException.class));
+    }
+
+    @Test
+    public void shouldThrowRecoverableExceptionOnUnknownProducerException() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("dropped-records")
+        );
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer) {
+            @Override
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[],
byte[]> record, final Callback callback) {
+                throw new KafkaException(new UnknownProducerIdException("asdf"));
+            }
+        });
+
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class,
() ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+        assertThat(thrown.getCause().getCause(), instanceOf(UnknownProducerIdException.class));
+    }
+
+    @Test
     public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() {
         final RecordCollector collector = new RecordCollectorImpl(
             "test",
             logContext,
             new DefaultProductionExceptionHandler(),
             new Metrics().sensor("skipped-records"));
-        collector.init(new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer,
byteArraySerializer) {
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer) {
             @Override
-            public synchronized Future<RecordMetadata> send(final ProducerRecord record,
final Callback callback) {
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[],
byte[]> record, final Callback callback) {
                 callback.onCompletion(null, new ProducerFencedException("asdf"));
                 return null;
             }
         });
 
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner);
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class,
() ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(ProducerFencedException.class));
+    }
+
+    @Test
+    public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+        collector.init(new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer) {
+            @Override
+            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[],
byte[]> record, final Callback callback) {
+                callback.onCompletion(null, new UnknownProducerIdException("asdf"));
+                return null;
+            }
+        });
+
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner);
+        final RecoverableClientException thrown = assertThrows(RecoverableClientException.class,
() ->
+            collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer,
streamPartitioner)
+        );
+        assertThat(thrown.getCause(), instanceOf(UnknownProducerIdException.class));
     }
 
     @SuppressWarnings("unchecked")


Mime
View raw message