kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5169; KafkaConsumer.close should be idempotent
Date Fri, 05 May 2017 07:51:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 05ea454df -> 715eae6da


KAFKA-5169; KafkaConsumer.close should be idempotent

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2968 from mjsax/kafka-5169-consumer-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/715eae6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/715eae6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/715eae6d

Branch: refs/heads/trunk
Commit: 715eae6da92d5d1db7ac65e3c8ff753e15b5c0ca
Parents: 05ea454
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri May 5 08:51:02 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri May 5 08:51:02 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java  |  2 ++
 .../kafka/clients/consumer/KafkaConsumerTest.java     | 14 ++++++++++----
 .../kafka/clients/producer/KafkaProducerTest.java     |  9 +++++++++
 3 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/715eae6d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index da5b7fb..9df674d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1533,6 +1533,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @throws IllegalArgumentException If the <code>timeout</code> is negative.
      */
     public void close(long timeout, TimeUnit timeUnit) {
+        if (closed)
+            return;
         if (timeout < 0)
             throw new IllegalArgumentException("The timeout cannot be negative.");
         acquire();

http://git-wip-us.apache.org/repos/asf/kafka/blob/715eae6d/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8e49350..a598b5d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import java.util.concurrent.ScheduledExecutorService;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
@@ -68,7 +67,9 @@ import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -84,6 +85,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -98,9 +100,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
-
 public class KafkaConsumerTest {
     private final String topic = "test";
     private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -1228,6 +1227,13 @@ public class KafkaConsumerTest {
         consumerCloseTest(Long.MAX_VALUE, Collections.<AbstractResponse>emptyList(),
0, true);
     }
 
+    @Test
+    public void closeShouldBeIdempotent() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        consumer.close();
+        consumer.close();
+    }
+
     private void consumerCloseTest(final long closeTimeoutMs,
             List<? extends AbstractResponse> responses,
             long waitMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/715eae6d/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 514426d..ea493d2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -409,4 +409,13 @@ public class KafkaProducerTest {
 
     }
 
+    @Test
+    public void closeShouldBeIdempotent() {
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(),
new ByteArraySerializer());
+        producer.close();
+        producer.close();
+    }
+
 }


Mime
View raw message