kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6833; Producer should await metadata for unknown partitions (#6073)
Date Wed, 09 Jan 2019 02:05:23 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e325676  KAFKA-6833; Producer should await metadata for unknown partitions (#6073)
e325676 is described below

commit e32567699451e2fb0ccd63dd8a3df582cfd18d61
Author: Bob Barrett <bob.barrett@outlook.com>
AuthorDate: Tue Jan 8 21:05:13 2019 -0500

    KAFKA-6833; Producer should await metadata for unknown partitions (#6073)
    
    This patch changes the behavior of KafkaProducer.waitOnMetadata to wait up to max.block.ms
when the partition specified in the produce request is out of the range of partitions present
in the metadata. This improves the user experience in the case when partitions are added to
a topic and a client attempts to produce to one of the new partitions before the metadata
has propagated to the brokers. Tested with unit tests.
    
    Reviewers: Arjun Satish <arjun@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/producer/KafkaProducer.java      |  31 ++--
 .../kafka/clients/producer/KafkaProducerTest.java  | 182 +++++++++++++--------
 .../kafka/api/BaseProducerSendTest.scala           |  20 ++-
 .../kafka/api/ProducerFailureHandlingTest.scala    |  14 +-
 4 files changed, 153 insertions(+), 94 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 85ed9f8..540493d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -966,12 +966,15 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         long begin = time.milliseconds();
         long remainingWaitMs = maxWaitMs;
         long elapsed;
-        // Issue metadata requests until we have metadata for the topic or maxWaitTimeMs
is exceeded.
-        // In case we already have cached metadata for the topic, but the requested partition
is greater
-        // than expected, issue an update request only once. This is necessary in case the
metadata
+        // Issue metadata requests until we have metadata for the topic and the requested
partition,
+        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
         // is stale and the number of partitions for this topic has increased in the meantime.
         do {
-            log.trace("Requesting metadata update for topic {}.", topic);
+            if (partition != null) {
+                log.trace("Requesting metadata update for partition {} of topic {}.", partition,
topic);
+            } else {
+                log.trace("Requesting metadata update for topic {}.", topic);
+            }
             metadata.add(topic);
             int version = metadata.requestUpdate();
             sender.wakeup();
@@ -979,24 +982,26 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                 metadata.awaitUpdate(version, remainingWaitMs);
             } catch (TimeoutException ex) {
                 // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
-                throw new TimeoutException("Failed to update metadata after " + maxWaitMs
+ " ms.");
+                throw new TimeoutException(
+                        String.format("Topic %s not present in metadata after %d ms.",
+                                topic, maxWaitMs));
             }
             cluster = metadata.fetch();
             elapsed = time.milliseconds() - begin;
-            if (elapsed >= maxWaitMs)
-                throw new TimeoutException("Failed to update metadata after " + maxWaitMs
+ " ms.");
+            if (elapsed >= maxWaitMs) {
+                throw new TimeoutException(partitionsCount == null ?
+                        String.format("Topic %s not present in metadata after %d ms.",
+                                topic, maxWaitMs) :
+                        String.format("Partition %d of topic %s with partition count %d is
not present in metadata after %d ms.",
+                                partition, topic, partitionsCount, maxWaitMs));
+            }
             if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
             if (cluster.invalidTopics().contains(topic))
                 throw new InvalidTopicException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
             partitionsCount = cluster.partitionCountForTopic(topic);
-        } while (partitionsCount == null);
-
-        if (partition != null && partition >= partitionsCount) {
-            throw new KafkaException(
-                    String.format("Invalid partition given with record: %d is not in the
range [0...%d).", partition, partitionsCount));
-        }
+        } while (partitionsCount == null || (partition != null && partition >=
partitionsCount));
 
         return new ClusterAndWaitTime(cluster, elapsed);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 872d390..a73ab7c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -65,9 +66,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertArrayEquals;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -82,6 +83,27 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
+    private String topic = "topic";
+    private Collection<Node> nodes = Collections.singletonList(new Node(0, "host1",
1000));
+    private final Cluster emptyCluster = new Cluster(null, nodes,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private final Cluster onePartitionCluster = new Cluster(
+            "dummy",
+            Collections.singletonList(new Node(0, "host1", 1000)),
+            Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private final Cluster threePartitionCluster = new Cluster(
+            "dummy",
+            Collections.singletonList(new Node(0, "host1", 1000)),
+            Arrays.asList(
+                    new PartitionInfo(topic, 0, null, null, null),
+                    new PartitionInfo(topic, 1, null, null, null),
+                    new PartitionInfo(topic, 2, null, null, null)),
+            Collections.emptySet(),
+            Collections.emptySet());
 
     @Test
     public void testMetricsReporterAutoGeneratedClientId() {
@@ -289,22 +311,10 @@ public class KafkaProducerTest {
     public void testMetadataFetch() throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        String topic = "topic";
-        Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
-        final Cluster emptyCluster = new Cluster(null, nodes,
-                Collections.emptySet(),
-                Collections.emptySet(),
-                Collections.emptySet());
-        final Cluster cluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
-                Collections.emptySet(),
-                Collections.emptySet());
         Metadata metadata = mock(Metadata.class);
 
         // Return empty cluster 4 times and cluster from then on
-        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster,
cluster);
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster,
onePartitionCluster);
 
         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs,
new StringSerializer(),
                 new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata),
null, Time.SYSTEM) {
@@ -338,91 +348,127 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testMetadataFetchOnStaleMetadata() throws Exception {
+    public void testMetadataTimeoutWithMissingTopic() throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        String topic = "topic";
-        ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic,
"value");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
         // Create a record with a partition higher than the initial (outdated) partition
range
-        ProducerRecord<String, String> extendedRecord = new ProducerRecord<>(topic,
2, null, "value");
-        Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
-        final Cluster emptyCluster = new Cluster(null, nodes,
-                Collections.emptySet(),
-                Collections.emptySet(),
-                Collections.emptySet());
-        final Cluster initialCluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
-                Collections.emptySet(),
-                Collections.emptySet());
-        final Cluster extendedCluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                        new PartitionInfo(topic, 0, null, null, null),
-                        new PartitionInfo(topic, 1, null, null, null),
-                        new PartitionInfo(topic, 2, null, null, null)),
-                Collections.emptySet(),
-                Collections.emptySet());
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2,
null, "value");
         Metadata metadata = mock(Metadata.class);
 
+        MockTime mockTime = new MockTime();
         AtomicInteger invocationCount = new AtomicInteger(0);
-
-        // Return empty cluster 4 times, initialCluster 5 times and extendedCluster after
that
         when(metadata.fetch()).then(invocation -> {
             invocationCount.incrementAndGet();
-            if (invocationCount.get() > 9)
-                return extendedCluster;
-            else if (invocationCount.get() > 4)
-                return initialCluster;
+            if (invocationCount.get() == 5) {
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+            }
+
             return emptyCluster;
         });
 
         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs,
new StringSerializer(),
-                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata),
null, Time.SYSTEM) {
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata),
null, mockTime) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata)
{
                 // give Sender its own Metadata instance so that we can isolate Metadata
calls from KafkaProducer
                 return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000,
true));
             }
         };
-        producer.send(initialRecord);
 
-        // One request update for each empty cluster returned
+        // Four request updates where the topic isn't present, at which point the timeout
expires and a
+        // TimeoutException is thrown
+        Future future = producer.send(record);
         verify(metadata, times(4)).requestUpdate();
         verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
         verify(metadata, times(5)).fetch();
-
-        // Should not request update if metadata is available and records are within range
-        producer.send(initialRecord);
-        verify(metadata, times(4)).requestUpdate();
-        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
-        verify(metadata, times(6)).fetch();
-
-        // One request update followed by exception if topic metadata is available but metadata
response still returns
-        // the same partition size (either because metadata are still stale at the broker
too or because
-        // there weren't any partitions added in the first place)
         try {
-            producer.send(extendedRecord);
-            fail("Expected KafkaException to be raised");
-        } catch (KafkaException e) {
-            // expected
+            future.get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
         }
-        verify(metadata, times(5)).requestUpdate();
-        verify(metadata, times(5)).awaitUpdate(anyInt(), anyLong());
-        verify(metadata, times(8)).fetch();
+    }
 
+    @Test
+    public void testMetadataWithPartitionOutOfRange() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record with a partition higher than the initial (outdated) partition
range
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2,
null, "value");
+        Metadata metadata = mock(Metadata.class);
+
+        MockTime mockTime = new MockTime();
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster, onePartitionCluster, threePartitionCluster);
+
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs,
new StringSerializer(),
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata),
null, mockTime) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata)
{
+                // give Sender its own Metadata instance so that we can isolate Metadata
calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000,
true));
+            }
+        };
         // One request update if metadata is available but outdated for the given record
-        producer.send(extendedRecord);
-        verify(metadata, times(6)).requestUpdate();
-        verify(metadata, times(6)).awaitUpdate(anyInt(), anyLong());
-        verify(metadata, times(10)).fetch();
+        producer.send(record);
+        verify(metadata, times(2)).requestUpdate();
+        verify(metadata, times(2)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(3)).fetch();
 
         producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
+    public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record with a partition higher than the initial (outdated) partition
range
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2,
null, "value");
+        Metadata metadata = mock(Metadata.class);
+
+        MockTime mockTime = new MockTime();
+        AtomicInteger invocationCount = new AtomicInteger(0);
+        when(metadata.fetch()).then(invocation -> {
+            invocationCount.incrementAndGet();
+            if (invocationCount.get() == 5) {
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+            }
+
+            return onePartitionCluster;
+        });
+
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs,
new StringSerializer(),
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata),
null, mockTime) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata)
{
+                // give Sender its own Metadata instance so that we can isolate Metadata
calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000,
true));
+            }
+        };
+
+        // Four request updates where the requested partition is out of range, at which point
the timeout expires
+        // and a TimeoutException is thrown
+        Future future = producer.send(record);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(5)).fetch();
+        try {
+            future.get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test
     public void testTopicRefreshInMetadata() throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 09a6188..9d454e9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -21,19 +21,20 @@ import java.nio.charset.StandardCharsets
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
-import collection.JavaConverters._
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
 import scala.concurrent.ExecutionException
 
@@ -71,13 +72,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   protected def createProducer(brokerList: String,
                                lingerMs: Int = 0,
                                batchSize: Int = 16384,
-                               compressionType: String = "none"): KafkaProducer[Array[Byte],Array[Byte]]
= {
+                               compressionType: String = "none",
+                               maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]]
= {
     val producer = TestUtils.createProducer(brokerList,
       compressionType = compressionType,
       securityProtocol = securityProtocol,
       trustStoreFile = trustStoreFile,
       saslProperties = clientSaslProperties,
-      lingerMs = lingerMs)
+      lingerMs = lingerMs,
+      maxBlockMs = maxBlockMs)
     registerProducer(producer)
   }
 
@@ -344,7 +347,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     */
   @Test
   def testSendBeforeAndAfterPartitionExpansion() {
-    val producer = createProducer(brokerList)
+    val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L)
 
     // create topic
     createTopic(topic, 1, 2)
@@ -364,10 +367,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     // Trying to send a record to a partition beyond topic's partition range before adding
the partition should fail.
     val partition1 = 1
     try {
-      producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8)))
+      producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8))).get()
       fail("Should not allow sending a record to a partition not present in the metadata")
     } catch {
-      case _: KafkaException => // this is ok
+      case e: ExecutionException => e.getCause match {
+        case _: TimeoutException => // this is ok
+        case ex => throw new Exception("Sending to a partition not present in the metadata
should result in a TimeoutException", ex)
+      }
     }
 
     val existingAssignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index b7d3ecb..17d68d1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import java.util.concurrent.{ExecutionException, TimeoutException}
+import java.util.concurrent.ExecutionException
 import java.util.Properties
 
 import kafka.integration.KafkaServerTestHarness
@@ -25,7 +25,6 @@ import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
@@ -182,8 +181,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   }
 
   /**
-    * Send with invalid partition id should throw KafkaException when partition is higher
than the upper bound of
-    * partitions.
+    * Send with invalid partition id should return ExecutionException caused by TimeoutException
+    * when partition is higher than the upper bound of partitions.
     */
   @Test
   def testInvalidPartition() {
@@ -192,8 +191,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
 
     // create a record with incorrect partition id (higher than the number of partitions),
send should fail
     val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes)
-    intercept[KafkaException] {
-      producer1.send(higherRecord)
+    intercept[ExecutionException] {
+      producer1.send(higherRecord).get
+    }.getCause match {
+      case _: TimeoutException => // this is ok
+      case ex => throw new Exception("Sending to a partition not present in the metadata
should result in a TimeoutException", ex)
     }
   }
 


Mime
View raw message