kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/3] kafka git commit: KAFKA-3771; Improving Kafka core code
Date Mon, 06 Jun 2016 07:46:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2c7fae0a4 -> 79aaf19f2


http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d37de76..40ad0f3 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -55,8 +55,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   val message = "hello"
 
   var producer: KafkaProducer[Integer, String] = null
-  def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
-  def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
+  def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
+  def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
   var servers = Seq.empty[KafkaServer]
 
   // Some tests restart the brokers then produce more data. But since test brokers use random
ports, we need
@@ -95,7 +95,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     producer.close()
     for (server <- servers) {
       server.shutdown()
-      Utils.delete(new File(server.config.logDirs(0)))
+      Utils.delete(new File(server.config.logDirs.head))
     }
     super.tearDown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 312edd4..e0b6db4 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -177,8 +177,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
 
   def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
     for(logDir <- logDirs) {
-      val brokerMetadataOpt = (new BrokerMetadataCheckpoint(
-        new File(logDir + File.separator + brokerMetaPropsFile))).read()
+      val brokerMetadataOpt = new BrokerMetadataCheckpoint(
+        new File(logDir + File.separator + brokerMetaPropsFile)).read()
       brokerMetadataOpt match {
         case Some(brokerMetadata: BrokerMetadata) =>
           if (brokerMetadata.brokerId != brokerId)  return false

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 689b70b..7741698 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -105,7 +105,7 @@ class SimpleFetchTest {
     val partition = replicaManager.getOrCreatePartition(topic, partitionId)
 
     // create the leader replica with the local log
-    val leaderReplica = new Replica(configs(0).brokerId, partition, time, 0, Some(log))
+    val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
     leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW)
     partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
 
@@ -144,15 +144,15 @@ class SimpleFetchTest {
    */
   @Test
   def testReadFromLog() {
-    val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
-    val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();
+    val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()
+    val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
 
     assertEquals("Reading committed data should return messages only up to high watermark",
messagesToHW,
       replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
       replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
 
-    assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count());
-    assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count());
+    assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
+    assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index 37d334b..741eec9 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -69,10 +69,10 @@ class ConsoleProducerTest {
   @Test
   def testParseKeyProp(): Unit = {
     val config = new ConsoleProducer.ProducerConfig(validArgs)
-    val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader];
+    val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader]
     reader.init(System.in,ConsoleProducer.getReaderProps(config))
     assert(reader.keySeparator == "#")
-    assert(reader.parseKey == true)
+    assert(reader.parseKey)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
index 56f5905..6a40510 100644
--- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
@@ -22,7 +22,7 @@ import org.junit.{Test, After, Before}
 
 class IteratorTemplateTest extends Assertions {
   
-  val lst = (0 until 10)
+  val lst = 0 until 10
   val iterator = new IteratorTemplate[Int]() {
     var i = 0
     override def makeNext() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 7c4b951..f39fa6b 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -51,7 +51,7 @@ object JaasTestUtils {
         entries = Map(
           "username" -> username,
           "password" -> password
-        ) ++ validUsers.map { case (user, pass) => (s"user_$user"-> pass)}
+        ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass }
       )
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 434c22a..e9dbbb1 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -56,7 +56,7 @@ class MockScheduler(val time: Time) extends Scheduler {
   def tick() {
     this synchronized {
       val now = time.milliseconds
-      while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+      while(tasks.nonEmpty && tasks.head.nextExecution <= now) {
         /* pop and execute the task with the lowest next execution time */
         val curr = tasks.dequeue
         curr.fun()
@@ -78,7 +78,7 @@ class MockScheduler(val time: Time) extends Scheduler {
   
 }
 
-case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val
period: Long) extends Ordered[MockTask] {
+case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long)
extends Ordered[MockTask] {
   def periodic = period >= 0
   def compare(t: MockTask): Int = {
     if(t.nextExecution == nextExecution)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/MockTime.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala
index 0858e04..21fb4d9 100644
--- a/core/src/test/scala/unit/kafka/utils/MockTime.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala
@@ -46,7 +46,7 @@ class MockTime(@volatile private var currentMs: Long) extends Time {
     scheduler.tick()
   }
   
-  override def toString() = "MockTime(%d)".format(milliseconds)
+  override def toString = "MockTime(%d)".format(milliseconds)
 
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7df87fc..b42a6ba 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -94,7 +94,7 @@ object TestUtils extends Logging {
     val parentFile = new File(parent)
     parentFile.mkdirs()
 
-    org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-");
+    org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-")
   }
 
   /**
@@ -335,12 +335,12 @@ object TestUtils extends Logging {
 
     // check if the expected iterator is longer
     if (expected.hasNext) {
-      var length1 = length;
+      var length1 = length
       while (expected.hasNext) {
         expected.next
         length1 += 1
       }
-      assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " +
length, true);
+      assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " +
length, true)
     }
 
     // check if the actual iterator was longer
@@ -350,7 +350,7 @@ object TestUtils extends Logging {
         actual.next
         length2 += 1
       }
-      assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " +
length, true);
+      assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " +
length, true)
     }
   }
 
@@ -671,7 +671,7 @@ object TestUtils extends Logging {
         try{
           val currentLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
           var newLeaderAndIsr: LeaderAndIsr = null
-          if(currentLeaderAndIsrOpt == None)
+          if(currentLeaderAndIsrOpt.isEmpty)
             newLeaderAndIsr = new LeaderAndIsr(leader, List(leader))
           else{
             newLeaderAndIsr = currentLeaderAndIsrOpt.get
@@ -716,7 +716,7 @@ object TestUtils extends Logging {
           } else if (oldLeaderOpt.isDefined && oldLeaderOpt.get != l) {
             trace("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition,
oldLeaderOpt.get, l))
             isLeaderElectedOrChanged = true
-          } else if (!oldLeaderOpt.isDefined) {
+          } else if (oldLeaderOpt.isEmpty) {
             trace("Leader %d is elected for partition [%s,%d]".format(l, topic, partition))
             isLeaderElectedOrChanged = true
           } else {
@@ -856,7 +856,7 @@ object TestUtils extends Logging {
     // in sync replicas should not have any replica that is not in the new assigned replicas
     val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
     assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas,
assignedReplicas),
-      phantomInSyncReplicas.size == 0)
+      phantomInSyncReplicas.isEmpty)
   }
 
   def ensureNoUnderReplicatedPartitions(zkUtils: ZkUtils, topic: String, partitionToBeReassigned:
Int, assignedReplicas: Seq[Int],
@@ -1031,7 +1031,7 @@ object TestUtils extends Logging {
       "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic,
topic))
     // ensure that the topic-partition has been deleted from all brokers' replica managers
     TestUtils.waitUntilTrue(() =>
-      servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic,
tp.partition) == None)),
+      servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic,
tp.partition).isEmpty)),
       "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is marked successful
in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
@@ -1146,7 +1146,7 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int]
{
 @deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0")
 class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner {
   def partition(data: Any, numPartitions: Int): Int = {
-    (data.asInstanceOf[String].length % numPartitions)
+    data.asInstanceOf[String].length % numPartitions
   }
 }
 


Mime
View raw message