kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4284: Make Partitioner a Closeable and close it when closing the producer
Date Tue, 08 Nov 2016 17:53:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8f2e0a5ec -> 471a2ab17


KAFKA-4284: Make Partitioner a Closeable and close it when closing the producer

[KAFKA-4284](https://issues.apache.org/jira/browse/KAFKA-4284)

Even though Partitioner has a close method it is not closed when the producer is closed. Serializers,
interceptors and metrics are all closed, so partitioners should be closed to.

To be able to use the same mechanism to close the partitioner as the serializers, etc. I had
to make the `Partitioner` interface extend `Closeable`. Since this doesn't change the interface
that feels ok and should be backwards compatible.

Looking at [KAFKA-2091](https://issues.apache.org/jira/browse/KAFKA-2091) (d6c45c70fb9773043766446e88370db9709e7995)
that introduced the `Partitioner` interface it looks like the intention was that the producer
should close the partitioner.

This contribution is my original work and I license the work to the project under the project's
open source license.

Author: Theo <theo@iconara.net>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2000 from iconara/kafka-4284


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/471a2ab1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/471a2ab1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/471a2ab1

Branch: refs/heads/trunk
Commit: 471a2ab17df66c52e0a98afa81e0b360560a5e9a
Parents: 8f2e0a5
Author: Theo <theo@iconara.net>
Authored: Tue Nov 8 09:27:11 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Nov 8 09:27:11 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  1 +
 .../kafka/clients/producer/Partitioner.java     |  4 +-
 .../clients/producer/KafkaProducerTest.java     | 22 +++++++++
 .../org/apache/kafka/test/MockPartitioner.java  | 51 ++++++++++++++++++++
 4 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/471a2ab1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 489c762..54a5474 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -722,6 +722,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
+        ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
         log.debug("The Kafka producer has closed.");
         if (firstException.get() != null && !swallowException)

http://git-wip-us.apache.org/repos/asf/kafka/blob/471a2ab1/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index 383619d..c973cd3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -20,11 +20,13 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.Cluster;
 
+import java.io.Closeable;
+
 /**
  * Partitioner Interface
  */
 
-public interface Partitioner extends Configurable {
+public interface Partitioner extends Configurable, Closeable {
 
     /**
      * Compute the partition for the given record.

http://git-wip-us.apache.org/repos/asf/kafka/blob/471a2ab1/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index c82b18b..90256bb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
+import org.apache.kafka.test.MockPartitioner;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -119,6 +120,27 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void testPartitionerClose() throws Exception {
+        try {
+            Properties props = new Properties();
+            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
+
+            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
+                    props, new StringSerializer(), new StringSerializer());
+            Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get());
+            Assert.assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
+
+            producer.close();
+            Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get());
+            Assert.assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
+        } finally {
+            // cleanup since we are using mutable static variables in MockPartitioner
+            MockPartitioner.resetCounters();
+        }
+    }
+
+    @Test
     public void testOsDefaultSocketBufferSizes() throws Exception {
         Map<String, Object> config = new HashMap<>();
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

http://git-wip-us.apache.org/repos/asf/kafka/blob/471a2ab1/clients/src/test/java/org/apache/kafka/test/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockPartitioner.java b/clients/src/test/java/org/apache/kafka/test/MockPartitioner.java
new file mode 100644
index 0000000..5532409
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockPartitioner.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.clients.producer.Partitioner;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockPartitioner implements Partitioner {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+
+    public MockPartitioner() {
+        INIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+    @Override
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) {
+        return 0;
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+    
+    public static void resetCounters() {
+        INIT_COUNT.set(0);
+        CLOSE_COUNT.set(0);
+    }
+}


Mime
View raw message