kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Avoid duplicate processing of notifications in ZkNodeChangeNotificationListener
Date Fri, 04 Aug 2017 10:55:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 44d4b8ebc -> dd712f14b


MINOR: Avoid duplicate processing of notifications in ZkNodeChangeNotificationListener

Also fix potential NPE.

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3615 from omkreddy/zk-notif-duplicates

(cherry picked from commit 62f03ba2cd27880562dbf85c5ef6699d25bc5b43)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd712f14
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd712f14
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd712f14

Branch: refs/heads/0.11.0
Commit: dd712f14badb7f116eb63d3a59c2ea9c22d235eb
Parents: 44d4b8e
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Fri Aug 4 11:39:56 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Aug 4 11:55:45 2017 +0100

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |  8 ++++---
 .../ZkNodeChangeNotificationListenerTest.scala  | 24 ++++++++++++++------
 2 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd712f14/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 960f690..5f6f8bb 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -93,12 +93,14 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
-            val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
-            data.map(notificationHandler.processNotification(_)).getOrElse {
+            val data = zkUtils.readDataMaybeNull(changeZnode)._1.orNull
+            if (data != null) {
+              notificationHandler.processNotification(data)
+            } else {
               logger.warn(s"read null data from $changeZnode when processing notification
$notification")
             }
+            lastExecutedChange = changeId
           }
-          lastExecutedChange = changeId
         }
         purgeObsoleteNotifications(now, notifications)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd712f14/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index f7dd40f..85ad9f0 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -41,21 +41,31 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness
{
     val seqNodePath = seqNodeRoot + "/" + seqNodePrefix
     val notificationMessage1 = "message1"
     val notificationMessage2 = "message2"
-    val changeExpirationMs = 100
+    val changeExpirationMs = 1000
 
     val notificationListener = new ZkNodeChangeNotificationListener(zkUtils, seqNodeRoot,
seqNodePrefix, notificationHandler, changeExpirationMs)
     notificationListener.init()
 
     zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)
 
-    TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1,
"failed to send/process notification message in the timeout period.")
+    TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1,
+      "Failed to send/process notification message in the timeout period.")
 
-    /*There is no easy way to test that purging. Even if we mock kafka time with MockTime,
the purging compares kafka time with the time stored in zookeeper stat and the
-    embeded zookeeper server does not provide a way to mock time. so to test purging we will
have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
-    Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even
after that the assertion can fail as the second node it self can be deleted
-    depending on how threads get scheduled.*/
+    /*
+     * There is no easy way to test purging. Even if we mock kafka time with MockTime, the
purging compares kafka time
+     * with the time stored in zookeeper stat and the embedded zookeeper server does not
provide a way to mock time.
+     * So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1)
issue a write and check
+     * Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even
that the assertion
+     * can fail as the second node can be deleted depending on how threads get scheduled.
+     */
 
     zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
-    TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
"failed to send/process notification message in the timeout period.")
+    TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
+      "Failed to send/process notification message in the timeout period.")
+
+    (3 to 10).foreach(i => zkUtils.createSequentialPersistentPath(seqNodePath, "message"
+ i))
+
+    TestUtils.waitUntilTrue(() => invocationCount == 10 ,
+      s"Expected 10 invocations of processNotifications, but there were $invocationCount")
   }
 }


Mime
View raw message