kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Add maybeThrow method to ZooKeeperClient AsyncResponse
Date Wed, 06 Dec 2017 14:55:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 078fd2136 -> fd8f182cc


MINOR: Add maybeThrow method to ZooKeeperClient AsyncResponse

* Add maybeThrow method to AsyncResponse
* Update KafkaZkClient to use newly introduced maybeThrow
* Change AsyncResponse from trait to abstract class for
more readable stacktraces (there's no benefit in using a
trait here)

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4266 from omkreddy/KAFKAZKCLEINT_EXCEPTION_CLEANUP


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd8f182c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd8f182c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd8f182c

Branch: refs/heads/trunk
Commit: fd8f182cc4ea80c6d394ca68a47c0ee26b114ebe
Parents: 078fd21
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Wed Dec 6 16:25:25 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Dec 6 16:27:21 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 41 ++++++++------------
 .../scala/kafka/zookeeper/ZooKeeperClient.scala | 10 ++++-
 .../kafka/zookeeper/ZooKeeperClientTest.scala   |  5 ++-
 3 files changed, 30 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd8f182c/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d4c7daa..55bfdfc 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -53,7 +53,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def createSequentialPersistentPath(path: String, data: String = ""): String = {
     val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
-    createResponse.resultException.foreach(e => throw e)
+    createResponse.maybeThrow
     createResponse.path
   }
 
@@ -229,7 +229,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val setDataResponse = set(configData)
     setDataResponse.resultCode match {
       case Code.NONODE => create(configData)
-      case _ => setDataResponse.resultException.foreach(e => throw e)
+      case _ => setDataResponse.maybeThrow
     }
   }
 
@@ -251,9 +251,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val path = ConfigEntityChangeNotificationSequenceZNode.createPath
     val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath),
acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
-    if (createResponse.resultCode != Code.OK) {
-      createResponse.resultException.foreach(e => throw e)
-    }
+    createResponse.maybeThrow
   }
 
   /**
@@ -324,9 +322,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
     val setDataResponse = setTopicAssignmentRaw(topic, assignment)
-    if (setDataResponse.resultCode != Code.OK) {
-      setDataResponse.resultException.foreach(e => throw e)
-    }
+    setDataResponse.maybeThrow
   }
 
   /**
@@ -379,7 +375,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteLogDirEventNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      getChildrenResponse.resultException.foreach(e => throw e)
+      getChildrenResponse.maybeThrow
     }
   }
 
@@ -644,8 +640,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     setDataResponse.resultCode match {
       case Code.NONODE =>
         val createDataResponse = create(reassignmentData)
-        createDataResponse.resultException.foreach(e => throw e)
-      case _ => setDataResponse.resultException.foreach(e => throw e)
+        createDataResponse.maybeThrow
+      case _ => setDataResponse.maybeThrow
     }
   }
 
@@ -658,10 +654,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
       acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
     val createResponse = retryRequestUntilConnected(createRequest)
-
-    if (createResponse.resultCode != Code.OK) {
-      throw createResponse.resultException.get
-    }
+    createResponse.maybeThrow
   }
 
   /**
@@ -766,7 +759,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      getChildrenResponse.resultException.foreach(e => throw e)
+      getChildrenResponse.maybeThrow
     }
   }
 
@@ -930,7 +923,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val path = AclChangeNotificationSequenceZNode.createPath
     val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resourceName),
acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
-    createResponse.resultException.foreach(e => throw e)
+    createResponse.maybeThrow
   }
 
   def propagateLogDirEvent(brokerId: Int) {
@@ -949,7 +942,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteAclChangeNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      getChildrenResponse.resultException.foreach(e => throw e)
+      getChildrenResponse.maybeThrow
     }
   }
 
@@ -964,8 +957,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
 
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
     deleteResponses.foreach { deleteResponse =>
-      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE)
{
-        deleteResponse.resultException.foreach(e => throw e)
+      if (deleteResponse.resultCode != Code.NONODE) {
+        deleteResponse.maybeThrow
       }
     }
   }
@@ -1130,7 +1123,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     if (setDataResponse.resultCode == Code.NONODE) {
       createConsumerOffset(group, topicPartition, offset)
     } else {
-      setDataResponse.resultException.foreach(e => throw e)
+      setDataResponse.maybeThrow
     }
   }
 
@@ -1202,13 +1195,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     var createResponse = retryRequestUntilConnected(createRequest)
 
     if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
-      createResponse.resultException.foreach(e => throw e)
+      createResponse.maybeThrow
     } else if (createResponse.resultCode == Code.NONODE) {
       createRecursive0(parentPath(path))
       createResponse = retryRequestUntilConnected(createRequest)
-      createResponse.resultException.foreach(e => throw e)
+      createResponse.maybeThrow
     } else if (createResponse.resultCode != Code.NODEEXISTS)
-      createResponse.resultException.foreach(e => throw e)
+      createResponse.maybeThrow
 
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd8f182c/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index ef17059..a75181b 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -388,7 +388,7 @@ case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends
Asy
   type Response = GetChildrenResponse
 }
 
-sealed trait AsyncResponse {
+sealed abstract class AsyncResponse {
   def resultCode: Code
   def path: String
   def ctx: Option[Any]
@@ -396,6 +396,14 @@ sealed trait AsyncResponse {
   /** Return None if the result code is OK and KeeperException otherwise. */
   def resultException: Option[KeeperException] =
     if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
+
+  /**
+   * Throw KeeperException if the result code is not OK.
+   */
+  def maybeThrow(): Unit = {
+    if (resultCode != Code.OK)
+      throw KeeperException.create(resultCode, path)
+  }
 }
 case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String)
extends AsyncResponse
 case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd8f182c/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index ea6a475..643d502 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -25,7 +25,7 @@ import javax.security.auth.login.Configuration
 
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.{CreateMode, ZooDefs}
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
 import org.junit.{After, Test}
@@ -60,6 +60,9 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   def testDeleteNonExistentZNode(): Unit = {
     val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
     assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
+    intercept[NoNodeException] {
+      deleteResponse.maybeThrow()
+    }
   }
 
   @Test


Mime
View raw message