kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
Date Wed, 05 Dec 2018 19:56:48 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b46c9bf  KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
b46c9bf is described below

commit b46c9bfca1bf03702852326a9d3fce71e93298b3
Author: Jonathan Santilli <jonathansantilli@users.noreply.github.com>
AuthorDate: Wed Dec 5 20:48:39 2018 +0100

    KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../streams/processor/internals/RecordCollectorImpl.java     |  6 ++++--
 .../streams/processor/internals/RecordCollectorTest.java     | 12 ++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 5df14ee..d3a0030 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -249,8 +249,10 @@ public class RecordCollectorImpl implements RecordCollector {
     @Override
     public void close() {
         log.debug("Closing producer");
-        producer.close();
-        producer = null;
+        if (producer != null) {
+            producer.close();
+            producer = null;
+        }
         checkForException();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index c4e58be..22163ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -387,6 +387,18 @@ public class RecordCollectorTest {
         }
     }
 
+    @Test
+    public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() {
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
+                "NoNPE",
+                logContext,
+                new DefaultProductionExceptionHandler(),
+                new Metrics().sensor("skipped-records")
+        );
+
+        collector.close();
+    }
+
     private static class CustomStringSerializer extends StringSerializer {
 
         private boolean isKey;


Mime
View raw message