kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6897; Prevent KafkaProducer.send from blocking when producer is closed (#5027)
Date Sat, 21 Jul 2018 21:03:18 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d11f6f2  KAFKA-6897; Prevent KafkaProducer.send from blocking when producer is closed
(#5027)
d11f6f2 is described below

commit d11f6f26b773343246c217f498de3c02916bec6c
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Sat Jul 21 14:03:14 2018 -0700

    KAFKA-6897; Prevent KafkaProducer.send from blocking when producer is closed (#5027)
    
    After successful completion of KafkaProducer#close, it is possible that an application
calls KafkaProducer#send. If the send is invoked for a topic for which we do not have any
metadata, the producer will block until `max.block.ms` elapses - we do not expect to receive
any metadata update in this case because Sender (and NetworkClient) has already exited. It
is only when RecordAccumulator#append is invoked that we notice that the producer has already
been closed and throw an exception [...]
    
    This patch makes sure `Metadata#awaitUpdate` periodically checks if the network client
has been closed, and if so bails out as soon as possible.
---
 .../kafka/clients/ManualMetadataUpdater.java       |  4 ++
 .../java/org/apache/kafka/clients/Metadata.java    | 35 ++++++++++--
 .../org/apache/kafka/clients/MetadataUpdater.java  |  9 +++-
 .../org/apache/kafka/clients/NetworkClient.java    |  6 +++
 .../admin/internals/AdminMetadataManager.java      |  4 ++
 .../kafka/clients/producer/KafkaProducer.java      | 25 +++++++--
 .../kafka/clients/producer/MockProducer.java       |  3 --
 .../producer/internals/RecordAccumulator.java      |  7 +--
 .../org/apache/kafka/clients/MetadataTest.java     | 40 +++++++++++---
 .../java/org/apache/kafka/clients/MockClient.java  |  1 +
 .../kafka/clients/producer/KafkaProducerTest.java  | 63 +++++++++++++++++++---
 .../kafka/clients/producer/MockProducerTest.java   | 10 ----
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |  4 +-
 .../kafka/api/BaseProducerSendTest.scala           |  4 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    |  2 +-
 15 files changed, 174 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 8252cf3..ec007a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -89,4 +89,8 @@ public class ManualMetadataUpdater implements MetadataUpdater {
     public void requestUpdate() {
         // Do nothing
     }
+
+    @Override
+    public void close() {
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 91b1587..6c663cf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -25,6 +26,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -48,7 +50,7 @@ import java.util.Set;
  * is removed from the metadata refresh set after an update. Consumers disable topic expiry
since they explicitly
  * manage topics while producers rely on topic expiry to limit the refresh set.
  */
-public final class Metadata {
+public final class Metadata implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(Metadata.class);
 
@@ -70,6 +72,7 @@ public final class Metadata {
     private boolean needMetadataForAllTopics;
     private final boolean allowAutoTopicCreation;
     private final boolean topicExpiryEnabled;
+    private boolean isClosed;
 
     public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation)
{
         this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
@@ -100,6 +103,7 @@ public final class Metadata {
         this.listeners = new ArrayList<>();
         this.clusterResourceListeners = clusterResourceListeners;
         this.needMetadataForAllTopics = false;
+        this.isClosed = false;
     }
 
     /**
@@ -164,12 +168,12 @@ public final class Metadata {
      * Wait for metadata update until the current version is larger than the last version
we know of
      */
     public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws
InterruptedException {
-        if (maxWaitMs < 0) {
+        if (maxWaitMs < 0)
             throw new IllegalArgumentException("Max time to wait for metadata updates should
not be < 0 milliseconds");
-        }
+
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVersion) {
+        while ((this.version <= lastVersion) && !isClosed()) {
             AuthenticationException ex = getAndClearAuthenticationException();
             if (ex != null)
                 throw ex;
@@ -180,6 +184,8 @@ public final class Metadata {
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs
+ " ms.");
             remainingWaitMs = maxWaitMs - elapsed;
         }
+        if (isClosed())
+            throw new KafkaException("Requested metadata update after close");
     }
 
     /**
@@ -224,6 +230,8 @@ public final class Metadata {
      */
     public synchronized void update(Cluster newCluster, Set<String> unavailableTopics,
long now) {
         Objects.requireNonNull(newCluster, "cluster should not be null");
+        if (isClosed())
+            throw new IllegalStateException("Update requested after metadata close");
 
         this.needUpdate = false;
         this.lastRefreshMs = now;
@@ -332,6 +340,25 @@ public final class Metadata {
     }
 
     /**
+     * "Close" this metadata instance to indicate that metadata updates are no longer possible.
This is typically used
+     * when the thread responsible for performing metadata updates is exiting and needs a
way to relay this information
+     * to any other thread(s) that could potentially wait on metadata update to come through.
+     */
+    @Override
+    public synchronized void close() {
+        this.isClosed = true;
+        this.notifyAll();
+    }
+
+    /**
+     * Check if this metadata instance has been closed. See {@link #close()} for more information.
+     * @return True if this instance has been closed; false otherwise
+     */
+    public synchronized boolean isClosed() {
+        return this.isClosed;
+    }
+
+    /**
      * MetadataUpdate Listener
      */
     public interface Listener {
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 09ed995..de765db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
+import java.io.Closeable;
 import java.util.List;
 
 /**
@@ -29,7 +30,7 @@ import java.util.List;
  * <p>
  * This class is not thread-safe!
  */
-public interface MetadataUpdater {
+public interface MetadataUpdater extends Closeable {
 
     /**
      * Gets the current cluster info without blocking.
@@ -82,4 +83,10 @@ public interface MetadataUpdater {
      * start of the update if possible (see `maybeUpdate` for more information).
      */
     void requestUpdate();
+
+    /**
+     * Close this updater.
+     */
+    @Override
+    void close();
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 720a781..fd16fe6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -581,6 +581,7 @@ public class NetworkClient implements KafkaClient {
     @Override
     public void close() {
         this.selector.close();
+        this.metadataUpdater.close();
     }
 
     /**
@@ -981,6 +982,11 @@ public class NetworkClient implements KafkaClient {
             this.metadata.requestUpdate();
         }
 
+        @Override
+        public void close() {
+            this.metadata.close();
+        }
+
         /**
          * Return true if there's at least one connection establishment is currently underway
          */
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 85d3c28..1ad3991 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -118,6 +118,10 @@ public class AdminMetadataManager {
         public void requestUpdate() {
             AdminMetadataManager.this.requestUpdate();
         }
+
+        @Override
+        public void close() {
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index cb52941..3991467 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -790,12 +790,12 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      *
      * @throws AuthenticationException if authentication fails. See the exception for more
details
      * @throws AuthorizationException fatal error indicating that the producer is not allowed
to write
-     * @throws IllegalStateException if a transactional.id has been configured and no transaction
has been started
+     * @throws IllegalStateException if a transactional.id has been configured and no transaction
has been started, or
+     *                               when send is invoked after producer has been closed.
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid objects given the
configured serializers
      * @throws TimeoutException If the time taken for fetching metadata or allocating memory
for the record has surpassed <code>max.block.ms</code>.
      * @throws KafkaException If a Kafka related error occurs that does not belong to the
public API exceptions.
-     *
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
@@ -804,14 +804,29 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         return doSend(interceptedRecord, callback);
     }
 
+    // Verify that this producer instance has not been closed. This method throws IllegalStateException
if the producer
+    // has already been closed.
+    private void throwIfProducerClosed() {
+        if (ioThread == null || !ioThread.isAlive())
+            throw new IllegalStateException("Cannot perform operation after producer has
been closed");
+    }
+
     /**
      * Implementation of asynchronously send a record to a topic.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback
callback) {
         TopicPartition tp = null;
         try {
+            throwIfProducerClosed();
             // first make sure the metadata for the topic is available
-            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(),
maxBlockTimeMs);
+            ClusterAndWaitTime clusterAndWaitTime;
+            try {
+                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
+            } catch (KafkaException e) {
+                if (metadata.isClosed())
+                    throw new KafkaException("Producer closed while send in progress", e);
+                throw e;
+            }
             long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
             Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
@@ -896,6 +911,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      * @param partition A specific partition expected to exist in metadata, or null if there's
no preference
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
      * @return The cluster containing topic metadata and the amount of time we waited in
ms
+     * @throws KafkaException for all Kafka-related exceptions, including the case where
this method is called after producer close
      */
     private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs)
throws InterruptedException {
         // add topic to metadata topic list if it is not there already and reset expiry
@@ -1016,8 +1032,9 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      * Get the partition metadata for the given topic. This can be used for custom partitioning.
      * @throws AuthenticationException if authentication fails. See the exception for more
details
      * @throws AuthorizationException if not authorized to the specified topic. See the exception
for more details
-     * @throws InterruptException If the thread is interrupted while blocked
+     * @throws InterruptException if the thread is interrupted while blocked
      * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
+     * @throws KafkaException for all Kafka-related exceptions, including the case where
this method is called after producer close
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 9e9869a..dc00b47 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -311,9 +311,6 @@ public class MockProducer<K, V> implements Producer<K, V>
{
 
     @Override
     public void close(long timeout, TimeUnit timeUnit) {
-        if (this.closed) {
-            throw new IllegalStateException("MockProducer is already closed.");
-        }
         this.closed = true;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e2b5844..31c6d75 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -195,7 +196,7 @@ public final class RecordAccumulator {
             Deque<ProducerBatch> dq = getOrCreateDeque(tp);
             synchronized (dq) {
                 if (closed)
-                    throw new IllegalStateException("Cannot send after the producer is closed.");
+                    throw new KafkaException("Producer closed while send in progress");
                 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers,
callback, dq);
                 if (appendResult != null)
                     return appendResult;
@@ -209,7 +210,7 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 // Need to check if producer is closed again after grabbing the dequeue lock.
                 if (closed)
-                    throw new IllegalStateException("Cannot send after the producer is closed.");
+                    throw new KafkaException("Producer closed while send in progress");
 
                 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers,
callback, dq);
                 if (appendResult != null) {
@@ -700,7 +701,7 @@ public final class RecordAccumulator {
      * Go through incomplete batches and abort them.
      */
     private void abortBatches() {
-        abortBatches(new IllegalStateException("Producer is closed forcefully."));
+        abortBatches(new KafkaException("Producer is closed forcefully."));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1188af7..969921e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -46,7 +47,7 @@ public class MetadataTest {
     private long refreshBackoffMs = 100;
     private long metadataExpireMs = 1000;
     private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
-    private AtomicReference<String> backgroundError = new AtomicReference<>();
+    private AtomicReference<Exception> backgroundError = new AtomicReference<>();
 
     @After
     public void tearDown() {
@@ -83,6 +84,30 @@ public class MetadataTest {
         assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time)
== 0);
     }
 
+    @Test
+    public void testMetadataAwaitAfterClose() throws InterruptedException {
+        long time = 0;
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        metadata.requestUpdate();
+        assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time)
== 0);
+        time += refreshBackoffMs;
+        assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time)
== 0);
+        String topic = "my-topic";
+        metadata.close();
+        Thread t1 = asyncFetch(topic, 500);
+        t1.join();
+        assertTrue(backgroundError.get().getClass() == KafkaException.class);
+        assertTrue(backgroundError.get().toString().contains("Requested metadata update after
close"));
+        clearBackgroundError();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMetadataUpdateAfterClose() {
+        metadata.close();
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
+    }
+
     private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs)
{
         long now = 10000;
 
@@ -409,15 +434,18 @@ public class MetadataTest {
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
     }
 
+    private void clearBackgroundError() {
+        backgroundError.set(null);
+    }
+
     private Thread asyncFetch(final String topic, final long maxWaitMs) {
         Thread thread = new Thread() {
             public void run() {
-                while (metadata.fetch().partitionsForTopic(topic).isEmpty()) {
-                    try {
+                try {
+                    while (metadata.fetch().partitionsForTopic(topic).isEmpty())
                         metadata.awaitUpdate(metadata.requestUpdate(), maxWaitMs);
-                    } catch (Exception e) {
-                        backgroundError.set(e.toString());
-                    }
+                } catch (Exception e) {
+                    backgroundError.set(e);
                 }
             }
         };
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 0f64f13..6b41a9e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -533,6 +533,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public void close() {
+        metadata.close();
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index bf03e46..dd2dd89 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -632,8 +632,8 @@ public class KafkaProducerTest {
         client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
-            ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-            new StringSerializer(), new StringSerializer(), metadata, client);
+                ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, client);
 
         String invalidTopicName = "topic abc";          // Invalid topic name due to space
         ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName,
"HelloKafka");
@@ -641,12 +641,12 @@ public class KafkaProducerTest {
         Set<String> invalidTopic = new HashSet<String>();
         invalidTopic.add(invalidTopicName);
         Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
-                                                            cluster.nodes(),
-                                                            new ArrayList<PartitionInfo>(0),
-                                                            Collections.<String>emptySet(),
-                                                            invalidTopic,
-                                                            cluster.internalTopics(),
-                                                            cluster.controller());
+                cluster.nodes(),
+                new ArrayList<PartitionInfo>(0),
+                Collections.<String>emptySet(),
+                invalidTopic,
+                cluster.internalTopics(),
+                cluster.controller());
         client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.<String>emptySet());
 
         Future<RecordMetadata> future = producer.send(record);
@@ -654,4 +654,51 @@ public class KafkaProducerTest {
         assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(),
metadata.fetch().invalidTopics());
         TestUtils.assertFutureError(future, InvalidTopicException.class);
     }
+
+    @Test
+    public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
+        Properties props = new Properties();
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        // Simulate a case where metadata for a particular topic is not available. This will
cause KafkaProducer#send to
+        // block in Metadata#awaitUpdate for the configured max.block.ms. When close() is
invoked, KafkaProducer#send should
+        // return with a KafkaException.
+        String topicName = "test";
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(
+                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, client);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final AtomicReference<Exception> sendException = new AtomicReference<>();
+
+        try {
+            executor.submit(() -> {
+                try {
+                    // Metadata for topic "test" will not be available which will cause us
to block indefinitely until
+                    // KafkaProducer#close is invoked.
+                    producer.send(new ProducerRecord<>(topicName, "key", "value"));
+                    fail();
+                } catch (Exception e) {
+                    sendException.set(e);
+                }
+            });
+
+            // Wait until metadata update for the topic has been requested
+            TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout
when waiting for topic to be added to metadata");
+            producer.close(0, TimeUnit.MILLISECONDS);
+            TestUtils.waitForCondition(() -> sendException.get() != null, "No producer
exception within timeout");
+            assertEquals(KafkaException.class, sendException.get().getClass());
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 27fac28..7a8c710 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -637,16 +637,6 @@ public class MockProducerTest {
     }
 
     @Test
-    public void shouldThrowOnCloseIfProducerIsClosed() {
-        buildMockProducer(true);
-        producer.close();
-        try {
-            producer.close();
-            fail("Should have thrown as producer is already closed");
-        } catch (IllegalStateException e) { }
-    }
-
-    @Test
     public void shouldThrowOnFenceProducerIfProducerIsClosed() {
         buildMockProducer(true);
         producer.close();
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 92396a7..9cc6ebe 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -31,7 +31,7 @@ import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist}
 import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig,
ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
@@ -357,6 +357,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               trace("Caught NoRecordsException, continue iteration.")
             case _: WakeupException =>
               trace("Caught WakeupException, continue iteration.")
+            case e: KafkaException if (shuttingDown || exitingOnSendFailure) =>
+              trace(s"Ignoring caught KafkaException during shutdown. sendFailure: $exitingOnSendFailure.",
e)
           }
           maybeFlushAndCommitOffsets()
         }
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ee0e90f..dc4041f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -35,6 +35,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.concurrent.ExecutionException
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
@@ -446,8 +447,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
           future.get()
           fail("No message should be sent successfully.")
         } catch {
-          case e: Exception =>
-            assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.",
e.getMessage)
+          case e: ExecutionException => assertEquals(classOf[KafkaException], e.getCause.getClass)
         }
       }
       assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count)
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7969485..9b77c2d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -205,7 +205,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     // create topic
     createTopic(topic1, replicationFactor = numServers)
 
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
 
     // first send a message to make sure the metadata is refreshed
     producer1.send(record).get


Mime
View raw message