kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8731: InMemorySessionStore throws NullPointerException on startup (#7132)
Date Wed, 31 Jul 2019 21:31:58 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f86f7e2  KAFKA-8731: InMemorySessionStore throws NullPointerException on startup
(#7132)
f86f7e2 is described below

commit f86f7e21d986b7cda3cf8c2e53b8500625846fa2
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Wed Jul 31 14:19:46 2019 -0700

    KAFKA-8731: InMemorySessionStore throws NullPointerException on startup (#7132)
    
    Reviewers:  Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
---
 .../kafka/streams/state/internals/InMemorySessionStore.java       | 8 ++++++++
 .../kafka/streams/state/internals/SessionBytesStoreTest.java      | 5 +++++
 2 files changed, 13 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index f3b8565..ebe9878 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -120,7 +120,15 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]>
{
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
         final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>
keyMap = endTimeMap.get(sessionKey.window().end());
+        if (keyMap == null) {
+            return;
+        }
+
         final ConcurrentNavigableMap<Long, byte[]> startTimeMap = keyMap.get(sessionKey.key());
+        if (startTimeMap == null) {
+            return;
+        }
+
         startTimeMap.remove(sessionKey.window().start());
 
         if (startTimeMap.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionBytesStoreTest.java
index a6cef81..9b29e8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionBytesStoreTest.java
@@ -472,6 +472,11 @@ public abstract class SessionBytesStoreTest {
         assertThat(messages, hasItem("Skipping record for expired segment."));
     }
 
+    @Test
+    public void shouldNotThrowExceptionRemovingNonexistentKey() {
+        sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1)));
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
         sessionStore.findSessions(null, 1L, 2L);


Mime
View raw message