kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3562; Handle topic deletion during a send
Date Mon, 11 Jul 2016 16:31:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 383cec9cf -> c43926822


KAFKA-3562; Handle topic deletion during a send

Fix timing window in producer by holding onto cluster object while processing send requests
so that changes to cluster during metadata refresh don't cause NPE if a topic is deleted.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com>, Ewen Cheslack-Postava <ewen@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #1478 from rajinisivaram/KAFKA-3562


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4392682
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4392682
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4392682

Branch: refs/heads/trunk
Commit: c439268224e3178002bfa28bc048722870f992e3
Parents: 383cec9
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon Jul 11 17:06:59 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Jul 11 17:06:59 2016 +0100

----------------------------------------------------------------------
 build.gradle                                    |  3 +
 .../kafka/clients/producer/KafkaProducer.java   | 41 +++++++++----
 .../clients/producer/KafkaProducerTest.java     | 62 ++++++++++++++++++++
 3 files changed, 93 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4392682/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6580927..b3be020 100644
--- a/build.gradle
+++ b/build.gradle
@@ -576,6 +576,9 @@ project(':clients') {
 
     testCompile libs.bcpkix
     testCompile libs.junit
+    testCompile libs.easymock
+    testCompile libs.powermock
+    testCompile libs.powermockEasymock
 
     testRuntime libs.slf4jlog4j
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4392682/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a61ee93..05d9377 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -437,8 +437,9 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available
-            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
-            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
+            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
+            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
+            Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
             try {
                 serializedKey = keySerializer.serialize(record.topic(), record.key());
@@ -455,7 +456,8 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
+
                         " specified in value.serializer");
             }
-            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
+
+            int partition = partition(record, serializedKey, serializedValue, cluster);
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey,
serializedValue);
             ensureValidRecordSize(serializedSize);
             tp = new TopicPartition(record.topic(), partition);
@@ -508,17 +510,19 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      * Wait for cluster metadata including partitions for the given topic to be available.
      * @param topic The topic we want metadata for
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
-     * @return The amount of time we waited in ms
+     * @return The cluster containing topic metadata and the amount of time we waited in
ms
      */
-    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException
{
+    private ClusterAndWaitTime waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException
{
         // add topic to metadata topic list if it is not there already and reset expiry
         this.metadata.add(topic);
-        if (metadata.fetch().partitionsForTopic(topic) != null)
-            return 0;
+        Cluster cluster = metadata.fetch();
+        if (cluster.partitionsForTopic(topic) != null)
+            return new ClusterAndWaitTime(cluster, 0);
 
         long begin = time.milliseconds();
         long remainingWaitMs = maxWaitMs;
-        while (metadata.fetch().partitionsForTopic(topic) == null) {
+        long elapsed = 0;
+        while (cluster.partitionsForTopic(topic) == null) {
             log.trace("Requesting metadata update for topic {}.", topic);
             int version = metadata.requestUpdate();
             sender.wakeup();
@@ -528,14 +532,15 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                 // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs
+ " ms.");
             }
-            long elapsed = time.milliseconds() - begin;
+            cluster = metadata.fetch();
+            elapsed = time.milliseconds() - begin;
             if (elapsed >= maxWaitMs)
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs
+ " ms.");
-            if (metadata.fetch().unauthorizedTopics().contains(topic))
+            if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
         }
-        return time.milliseconds() - begin;
+        return new ClusterAndWaitTime(cluster, elapsed);
     }
 
     /**
@@ -600,12 +605,13 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
+        Cluster cluster;
         try {
-            waitOnMetadata(topic, this.maxBlockTimeMs);
+            cluster = waitOnMetadata(topic, this.maxBlockTimeMs).cluster;
         } catch (InterruptedException e) {
             throw new InterruptException(e);
         }
-        return this.metadata.fetch().partitionsForTopic(topic);
+        return cluster.partitionsForTopic(topic);
     }
 
     /**
@@ -724,6 +730,15 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             cluster);
     }
 
+    private static class ClusterAndWaitTime {
+        final Cluster cluster;
+        final long waitedOnMetadataMs;
+        ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) {
+            this.cluster = cluster;
+            this.waitedOnMetadataMs = waitedOnMetadataMs;
+        }
+    }
+
     private static class FutureFailure implements Future<RecordMetadata> {
 
         private final ExecutionException exception;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4392682/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 461e3cf..1780e2f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,21 +16,37 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.support.membermodification.MemberModifier;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Properties;
 import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
 public class KafkaProducerTest {
 
     @Test
@@ -123,4 +139,50 @@ public class KafkaProducerTest {
         config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2);
         new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
     }
+
+    @PrepareOnlyThisForTest(Metadata.class)
+    @Test
+    public void testMetadataFetch() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new
StringSerializer(), new StringSerializer());
+        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
+        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+
+        String topic = "topic";
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+        Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
+        final Cluster emptyCluster = new Cluster(nodes,
+                Collections.<PartitionInfo>emptySet(),
+                Collections.<String>emptySet());
+        final Cluster cluster = new Cluster(
+                Collections.singletonList(new Node(0, "host1", 1000)),
+                Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.<String>emptySet());
+
+        // Expect exactly one fetch for each attempt to refresh while topic metadata is not
available
+        final int refreshAttempts = 5;
+        EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts -
1);
+        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected
call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        producer.send(record);
+        PowerMock.verify(metadata);
+
+        // Expect exactly one fetch if topic metadata is available
+        PowerMock.reset(metadata);
+        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected
call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        producer.send(record, null);
+        PowerMock.verify(metadata);
+
+        // Expect exactly one fetch if topic metadata is available
+        PowerMock.reset(metadata);
+        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected
call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        producer.partitionsFor(topic);
+        PowerMock.verify(metadata);
+    }
 }


Mime
View raw message