kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
Date Wed, 05 Dec 2018 22:36:31 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 69a4b2c  KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
69a4b2c is described below

commit 69a4b2ceacfae108be99ec0f63fdf2d6efd0dfcb
Author: Jonathan Santilli <jonathansantilli@users.noreply.github.com>
AuthorDate: Wed Dec 5 20:48:39 2018 +0100

    KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../streams/processor/internals/RecordCollectorImpl.java      |  6 ++++--
 .../streams/processor/internals/RecordCollectorTest.java      | 11 +++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 71f5be5..8cd8a3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -235,8 +235,10 @@ public class RecordCollectorImpl implements RecordCollector {
     @Override
     public void close() {
         log.debug("Closing producer");
-        producer.close();
-        producer = null;
+        if (producer != null) {
+            producer.close();
+            producer = null;
+        }
         checkForException();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 6b7549f..a1515ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -297,4 +297,15 @@ public class RecordCollectorTest {
         });
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
+
+    @Test
+    public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() {
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
+                "NoNPE",
+                logContext,
+                new DefaultProductionExceptionHandler()
+        );
+
+        collector.close();
+    }
 }


Mime
View raw message