kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3768; Replace all pattern match on boolean value by if/else block.
Date Sat, 04 Jun 2016 20:25:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ff300c9d4 -> ab3560606


KAFKA-3768; Replace all pattern match on boolean value by if/else block.

Author: Satendra kumar <satendra@knoldus.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1445 from satendrakumar06/remove_boolean_pattern_match


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab356060
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab356060
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab356060

Branch: refs/heads/trunk
Commit: ab356060665b3b6502c7d531366b26e1e0f48f9c
Parents: ff300c9
Author: Satendra kumar <satendra@knoldus.com>
Authored: Sat Jun 4 21:24:11 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 4 21:24:45 2016 +0100

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 98 ++++++++++----------
 .../controller/PartitionLeaderSelector.scala    | 32 +++----
 .../controller/PartitionStateMachine.scala      | 11 +--
 .../kafka/controller/ReplicaStateMachine.scala  | 15 ++-
 .../message/ByteBufferBackedInputStream.scala   | 24 +++--
 .../main/scala/kafka/producer/Producer.scala    | 14 +--
 core/src/main/scala/kafka/tools/JmxTool.scala   |  7 +-
 .../security/auth/ZkAuthorizationTest.scala     | 26 +++---
 8 files changed, 111 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6c503a5..d533a85 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -564,46 +564,45 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
    */
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext:
ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)
match {
-      case false =>
-        info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","),
topicAndPartition) +
-          "reassigned not yet caught up with the leader")
-        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
-        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
-        //1. Update AR in ZK with OAR + RAR.
-        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
-        //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
-        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
-          newAndOldReplicas.toSeq)
-        //3. replicas in RAR - OAR -> NewReplica
-        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext,
newReplicasNotInOldReplicaList)
-        info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","),
topicAndPartition) +
-          "reassigned to catch up with the leader")
-      case true =>
-        //4. Wait until all replicas in RAR are in sync with the leader.
-        val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
-- reassignedReplicas.toSet
-        //5. replicas in RAR -> OnlineReplica
-        reassignedReplicas.foreach { replica =>
-          replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition,
-            replica)), OnlineReplica)
-        }
-        //6. Set AR to RAR in memory.
-        //7. Send LeaderAndIsr request with a potential new leader (if current leader not
in RAR) and
-        //   a new AR (using RAR) and same isr to every broker in RAR
-        moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
-        //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
-        //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
-        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext,
oldReplicas)
-        //10. Update AR in ZK with RAR.
-        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
-        //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
-        removePartitionFromReassignedPartitions(topicAndPartition)
-        info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
-        controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
-        //12. After electing leader, the replicas and isr information changes, so resend
the update metadata request to every broker
-        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
-        // signal delete topic thread if reassignment for some partitions belonging to topics
being deleted just completed
-        deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
+    if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas))
{
+      info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","),
topicAndPartition) +
+        "reassigned not yet caught up with the leader")
+      val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
+      val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+      //1. Update AR in ZK with OAR + RAR.
+      updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+      //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
+      updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+        newAndOldReplicas.toSeq)
+      //3. replicas in RAR - OAR -> NewReplica
+      startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext,
newReplicasNotInOldReplicaList)
+      info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","),
topicAndPartition) +
+        "reassigned to catch up with the leader")
+    } else {
+      //4. Wait until all replicas in RAR are in sync with the leader.
+      val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
-- reassignedReplicas.toSet
+      //5. replicas in RAR -> OnlineReplica
+      reassignedReplicas.foreach { replica =>
+        replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition,
+          replica)), OnlineReplica)
+      }
+      //6. Set AR to RAR in memory.
+      //7. Send LeaderAndIsr request with a potential new leader (if current leader not in
RAR) and
+      //   a new AR (using RAR) and same isr to every broker in RAR
+      moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
+      //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
+      //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
+      stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext,
oldReplicas)
+      //10. Update AR in ZK with RAR.
+      updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
+      //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
+      removePartitionFromReassignedPartitions(topicAndPartition)
+      info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
+      controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+      //12. After electing leader, the replicas and isr information changes, so resend the
update metadata request to every broker
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
+      // signal delete topic thread if reassignment for some partitions belonging to topics
being deleted just completed
+      deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
     }
   }
 
@@ -853,16 +852,15 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
       partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
     } else {
       // check if the leader is alive or not
-      controllerContext.liveBrokerIds.contains(currentLeader) match {
-        case true =>
-          info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition)
+
-            "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
-          // shrink replication factor and update the leader epoch in zookeeper to use on
the next LeaderAndIsrRequest
-          updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
-        case false =>
-          info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition)
+
-            "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
-          partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
reassignedPartitionLeaderSelector)
+      if (controllerContext.liveBrokerIds.contains(currentLeader)) {
+        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition)
+
+          "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
+        // shrink replication factor and update the leader epoch in zookeeper to use on the
next LeaderAndIsrRequest
+        updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
+      } else {
+        info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition)
+
+          "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
+        partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
reassignedPartitionLeaderSelector)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 9d8b0b6..682ce1d 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -57,8 +57,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext,
confi
         val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
         val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
         val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
-          case true =>
+        val newLeaderAndIsr =
+          if (liveBrokersInIsr.isEmpty) {
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is
not disallowed by the configuration
             // for unclean leader election.
             if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
@@ -67,28 +67,26 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext,
confi
                 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds))
+
                 " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
             }
-
             debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned
replicas: %s"
               .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
-            liveAssignedReplicas.isEmpty match {
-              case true =>
-                throw new NoReplicaOnlineException(("No replica for partition " +
-                  "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds))
+
-                  " Assigned replicas are: [%s]".format(assignedReplicas))
-              case false =>
-                ControllerStats.uncleanLeaderElectionRate.mark()
-                val newLeader = liveAssignedReplicas.head
-                warn("No broker in ISR is alive for %s. Elect leader %d from live brokers
%s. There's potential data loss."
-                     .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
-                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion
+ 1)
+            if (liveAssignedReplicas.isEmpty) {
+              throw new NoReplicaOnlineException(("No replica for partition " +
+                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds))
+
+                " Assigned replicas are: [%s]".format(assignedReplicas))
+            } else {
+              ControllerStats.uncleanLeaderElectionRate.mark()
+              val newLeader = liveAssignedReplicas.head
+              warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s.
There's potential data loss."
+                .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
+              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion
+ 1)
             }
-          case false =>
+          } else {
             val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
             val newLeader = liveReplicasInIsr.head
             debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
-                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
+              .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList,
currentLeaderIsrZkPathVersion + 1)
-        }
+          }
         info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(),
topicAndPartition))
         (newLeaderAndIsr, liveAssignedReplicas)
       case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ec03b84..47efc51 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -245,12 +245,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
       controllerContext.partitionLeadershipInfo.get(topicPartition) match {
         case Some(currentLeaderIsrAndEpoch) =>
           // else, check if the leader for partition is alive. If yes, it is in Online state,
else it is in Offline state
-          controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader)
match {
-            case true => // leader is alive
-              partitionState.put(topicPartition, OnlinePartition)
-            case false =>
-              partitionState.put(topicPartition, OfflinePartition)
-          }
+          if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
+            // leader is alive
+            partitionState.put(topicPartition, OnlinePartition)
+          else
+            partitionState.put(topicPartition, OfflinePartition)
         case None =>
           partitionState.put(topicPartition, NewPartition)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 2fd8b95..d49b6af 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -330,14 +330,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
       val partition = topicPartition.partition
       assignedReplicas.foreach { replicaId =>
         val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
-        controllerContext.liveBrokerIds.contains(replicaId) match {
-          case true => replicaState.put(partitionAndReplica, OnlineReplica)
-          case false =>
-            // mark replicas on dead brokers as failed for topic deletion, if they belong
to a topic to be deleted.
-            // This is required during controller failover since during controller failover
a broker can go down,
-            // so the replicas on that broker should be moved to ReplicaDeletionIneligible
to be on the safer side.
-            replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
-        }
+        if (controllerContext.liveBrokerIds.contains(replicaId))
+          replicaState.put(partitionAndReplica, OnlineReplica)
+        else
+          // mark replicas on dead brokers as failed for topic deletion, if they belong to
a topic to be deleted.
+          // This is required during controller failover since during controller failover
a broker can go down,
+          // so the replicas on that broker should be moved to ReplicaDeletionIneligible
to be on the safer side.
+          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala b/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
index ce55c16..73dfd34 100644
--- a/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
@@ -22,21 +22,19 @@ import java.nio.ByteBuffer
 
 class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
   override def read():Int  = {
-    buffer.hasRemaining match {
-      case true =>
-        (buffer.get() & 0xFF)
-      case false => -1
-    }
+    if (buffer.hasRemaining)
+      buffer.get() & 0xFF
+    else
+      -1
   }
 
   override def read(bytes:Array[Byte], off:Int, len:Int):Int = {
-    buffer.hasRemaining match {
-      case true =>
-        // Read only what's left
-        val realLen = math.min(len, buffer.remaining())
-        buffer.get(bytes, off, realLen)
-        realLen
-      case false => -1
-    }
+    if (buffer.hasRemaining) {
+      // Read only what's left
+      val realLen = math.min(len, buffer.remaining())
+      buffer.get(bytes, off, realLen)
+      realLen
+    } else
+      -1
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index c11ad21..c2f95ea 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -74,13 +74,14 @@ class Producer[K,V](val config: ProducerConfig,
       if (hasShutdown.get)
         throw new ProducerClosedException
       recordStats(messages)
-      sync match {
-        case true => eventHandler.handle(messages)
-        case false => asyncSend(messages)
-      }
+      if (sync)
+        eventHandler.handle(messages)
+      else
+        asyncSend(messages)
     }
   }
 
+
   private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
       producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
@@ -95,11 +96,10 @@ class Producer[K,V](val config: ProducerConfig,
           queue.offer(message)
         case _  =>
           try {
-            config.queueEnqueueTimeoutMs < 0 match {
-            case true =>
+            if (config.queueEnqueueTimeoutMs < 0) {
               queue.put(message)
               true
-            case _ =>
+            } else {
               queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 8112f9e..1dcfb19 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -92,9 +92,10 @@ object JmxTool extends Logging {
     val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])
 
     val numExpectedAttributes: Map[ObjectName, Int] =
-      attributesWhitelistExists match {
-        case true => queries.map((_, attributesWhitelist.get.size)).toMap
-        case false => names.map{(name: ObjectName) =>
+      if (attributesWhitelistExists)
+        queries.map((_, attributesWhitelist.get.size)).toMap
+      else {
+        names.map{(name: ObjectName) =>
           val mbean = mbsc.getMBeanInfo(name)
           (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab356060/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index ab5324c..bbec5b1 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -196,14 +196,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging
{
     // Additionally, we create the consumers znode (not in
     // securePersistentZkPaths) to make sure that we don't
     // add ACLs to it.
-    val secureOpt: String  = secondZk.isSecure match {
-      case true =>
+    val secureOpt: String =
+      if (secondZk.isSecure) {
         firstZk.createPersistentPath(ZkUtils.ConsumersPath)
         "secure"
-      case false =>
+      } else {
         secondZk.createPersistentPath(ZkUtils.ConsumersPath)
         "unsecure"
-    }
+      }
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
     for (path <- secondZk.securePersistentZkPaths) {
@@ -231,15 +231,17 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging
{
    * Verifies ACL.
    */
   private def isAclCorrect(list: java.util.List[ACL], secure: Boolean): Boolean = {
-    val isListSizeCorrect = secure match {
-      case true => list.size == 2
-      case false => list.size == 1
-    } 
+    val isListSizeCorrect =
+      if (secure)
+        list.size == 2
+      else
+        list.size == 1
     isListSizeCorrect && list.asScala.forall(
-      secure match {
-        case true => isAclSecure
-        case false => isAclUnsecure
-      })
+      if (secure)
+        isAclSecure
+      else
+        isAclUnsecure
+    )
   }
   
   /**


Mime
View raw message