kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Avoid duplicate processing of notifications in ZkNodeChangeNotificationListener
Date Fri, 04 Aug 2017 10:55:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e7f7d4093 -> 62f03ba2c


MINOR: Avoid duplicate processing of notifications in ZkNodeChangeNotificationListener

Also fix potential NPE.

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3615 from omkreddy/zk-notif-duplicates


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62f03ba2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62f03ba2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62f03ba2

Branch: refs/heads/trunk
Commit: 62f03ba2cd27880562dbf85c5ef6699d25bc5b43
Parents: e7f7d40
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Fri Aug 4 11:39:56 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Aug 4 11:55:01 2017 +0100

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |  8 ++++---
 .../ZkNodeChangeNotificationListenerTest.scala  | 24 ++++++++++++++------
 2 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62f03ba2/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 37d35ff..450707b 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -93,12 +93,14 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
-            val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
-            data.map(notificationHandler.processNotification(_)).getOrElse {
+            val data = zkUtils.readDataMaybeNull(changeZnode)._1.orNull
+            if (data != null) {
+              notificationHandler.processNotification(data)
+            } else {
               logger.warn(s"read null data from $changeZnode when processing notification
$notification")
             }
+            lastExecutedChange = changeId
           }
-          lastExecutedChange = changeId
         }
         purgeObsoleteNotifications(now, notifications)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/62f03ba2/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 368ee0d..a646ced 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -41,21 +41,31 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness
{
     val seqNodePath = seqNodeRoot + "/" + seqNodePrefix
     val notificationMessage1 = "message1"
     val notificationMessage2 = "message2"
-    val changeExpirationMs = 100
+    val changeExpirationMs = 1000
 
     val notificationListener = new ZkNodeChangeNotificationListener(zkUtils, seqNodeRoot,
seqNodePrefix, notificationHandler, changeExpirationMs)
     notificationListener.init()
 
     zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)
 
-    TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1,
"failed to send/process notification message in the timeout period.")
+    TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1,
+      "Failed to send/process notification message in the timeout period.")
 
-    /*There is no easy way to test that purging. Even if we mock kafka time with MockTime,
the purging compares kafka time with the time stored in zookeeper stat and the
-    embeded zookeeper server does not provide a way to mock time. so to test purging we will
have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
-    Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even
after that the assertion can fail as the second node it self can be deleted
-    depending on how threads get scheduled.*/
+    /*
+     * There is no easy way to test purging. Even if we mock kafka time with MockTime, the
purging compares kafka time
+     * with the time stored in zookeeper stat and the embedded zookeeper server does not
provide a way to mock time.
+     * So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1)
issue a write and check
+     * Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even
that the assertion
+     * can fail as the second node can be deleted depending on how threads get scheduled.
+     */
 
     zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
-    TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
"failed to send/process notification message in the timeout period.")
+    TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
+      "Failed to send/process notification message in the timeout period.")
+
+    (3 to 10).foreach(i => zkUtils.createSequentialPersistentPath(seqNodePath, "message"
+ i))
+
+    TestUtils.waitUntilTrue(() => invocationCount == 10 ,
+      s"Expected 10 invocations of processNotifications, but there were $invocationCount")
   }
 }


Mime
View raw message