kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/3] KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun
Date Fri, 13 Sep 2013 22:37:31 GMT
Updated Branches:
  refs/heads/trunk fed901cad -> 324936609


http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4ed88e8..df90695 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,7 +30,7 @@ import scala.Some
 import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
-  
+
   var logDir: File = null
   val time = new MockTime
   var config: KafkaConfig = null
@@ -46,7 +46,7 @@ class LogTest extends JUnitSuite {
   def tearDown() {
     Utils.rm(logDir)
   }
-  
+
   def createEmptyLogs(dir: File, offsets: Int*) {
     for(offset <- offsets) {
       Log.logFilename(dir, offset).createNewFile()
@@ -168,19 +168,19 @@ class LogTest extends JUnitSuite {
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset =
Some(numMessages + 1))
     assertEquals("Should be no more messages", 0, lastRead.size)
   }
-  
+
   /** Test the case where we have compressed batches of messages */
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
     val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
-    
+
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3
*/
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes),
new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes),
new Message("beta".getBytes)))
-    
+
     def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
-    
+
     /* we should always get the first message in the compressed set when reading any offset
in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
     assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
@@ -202,7 +202,7 @@ class LogTest extends JUnitSuite {
     assertContains(makeRanges(5,8), 5)
     assertContains(makeRanges(5,8), 6)
   }
-  
+
   @Test
   def testEdgeLogRollsStartingAtZero() {
     // first test a log segment starting at 0
@@ -226,7 +226,7 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(i.toString.getBytes))
     val curOffset = log.logEndOffset
-    
+
     // time goes by; the log file is deleted
     log.markDeletedWhile(_ => true)
 
@@ -262,7 +262,7 @@ class LogTest extends JUnitSuite {
         case e:MessageSizeTooLargeException => // this is good
     }
   }
-  
+
   @Test
   def testLogRecoversToCorrectOffset() {
     val numMessages = 100
@@ -276,15 +276,15 @@ class LogTest extends JUnitSuite {
     val lastIndexOffset = log.segments.view.last.index.lastOffset
     val numIndexEntries = log.segments.view.last.index.entries
     log.close()
-    
+
     // test non-recovery case
     log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages),
numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries,
log.segments.view.last.index.entries)
     log.close()
-    
-    // test 
+
+    // test
     log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L,
needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages),
numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
@@ -305,10 +305,10 @@ class LogTest extends JUnitSuite {
 
     for (i<- 1 to msgPerSeg)
       log.append(set)
-    
+
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
-    
+
     val lastOffset = log.logEndOffset
     val size = log.size
     log.truncateTo(log.logEndOffset) // keep the entire log
@@ -326,7 +326,7 @@ class LogTest extends JUnitSuite {
 
     for (i<- 1 to msgPerSeg)
       log.append(set)
-    
+
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
     log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
@@ -371,14 +371,14 @@ class LogTest extends JUnitSuite {
   def testAppendWithoutOffsetAssignment() {
     for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
       logDir.mkdir()
-      var log = new Log(logDir, 
-                        maxLogFileSize = 64*1024, 
+      var log = new Log(logDir,
+                        maxLogFileSize = 64*1024,
                         maxMessageSize = config.messageMaxBytes,
-                        maxIndexSize = 1000, 
-                        indexIntervalBytes = 10000, 
+                        maxIndexSize = 1000,
+                        indexIntervalBytes = 10000,
                         needsRecovery = true)
       val messages = List("one", "two", "three", "four", "five", "six")
-      val ms = new ByteBufferMessageSet(compressionCodec = codec, 
+      val ms = new ByteBufferMessageSet(compressionCodec = codec,
                                         offsetCounter = new AtomicLong(0),
                                         messages = messages.map(s => new Message(s.getBytes)):_*)
       val firstOffset = ms.toList.head.offset
@@ -391,7 +391,7 @@ class LogTest extends JUnitSuite {
       log.delete()
     }
   }
-  
+
   /**
    * When we open a log any index segments without an associated log segment should be deleted.
    */
@@ -399,22 +399,22 @@ class LogTest extends JUnitSuite {
   def testBogusIndexSegmentsAreRemoved() {
     val bogusIndex1 = Log.indexFilename(logDir, 0)
     val bogusIndex2 = Log.indexFilename(logDir, 5)
-    
+
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val log = new Log(logDir, 
-                      maxLogFileSize = set.sizeInBytes * 5, 
+    val log = new Log(logDir,
+                      maxLogFileSize = set.sizeInBytes * 5,
                       maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 1, 
+                      maxIndexSize = 1000,
+                      indexIntervalBytes = 1,
                       needsRecovery = false)
-    
+
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length
> 0)
     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
-    
+
     // check that we can append to the log
     for(i <- 0 until 10)
       log.append(set)
-      
+
     log.delete()
   }
 
@@ -423,38 +423,38 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
 
     // create a log
-    var log = new Log(logDir, 
-                      maxLogFileSize = set.sizeInBytes * 5, 
+    var log = new Log(logDir,
+                      maxLogFileSize = set.sizeInBytes * 5,
                       maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 10000, 
+                      maxIndexSize = 1000,
+                      indexIntervalBytes = 10000,
                       needsRecovery = true)
-    
+
     // add enough messages to roll over several segments then close and re-open and attempt
to truncate
     for(i <- 0 until 100)
       log.append(set)
     log.close()
-    log = new Log(logDir, 
-                  maxLogFileSize = set.sizeInBytes * 5, 
+    log = new Log(logDir,
+                  maxLogFileSize = set.sizeInBytes * 5,
                   maxMessageSize = config.messageMaxBytes,
-                  maxIndexSize = 1000, 
-                  indexIntervalBytes = 10000, 
+                  maxIndexSize = 1000,
+                  indexIntervalBytes = 10000,
                   needsRecovery = true)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
   }
-  
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
-      case Some(range) => 
+      case Some(range) =>
         assertTrue(range + " does not contain " + offset, range.contains(offset))
       case None => fail("No range found, but expected to find " + offset)
     }
   }
-  
+
   class SimpleRange(val start: Long, val size: Long) extends Range
-  
+
   def makeRanges(breaks: Int*): Array[Range] = {
     val list = new ArrayList[Range]
     var prior = 0
@@ -464,5 +464,5 @@ class LogTest extends JUnitSuite {
     }
     list.toArray(new Array[Range](list.size))
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
index fe5bc09..7df7405 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
@@ -30,14 +30,15 @@ class KafkaTimerTest extends JUnit3Suite {
     val clock = new ManualClock
     val testRegistry = new MetricsRegistry(clock)
     val metric = testRegistry.newTimer(this.getClass, "TestTimer")
+    val Epsilon = java.lang.Double.longBitsToDouble(0x3ca0000000000000L)
 
     val timer = new KafkaTimer(metric)
     timer.time {
       clock.addMillis(1000)
     }
     assertEquals(1, metric.count())
-    assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
-    assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
+    assertTrue((metric.max() - 1000).abs <= Epsilon)
+    assertTrue((metric.min() - 1000).abs <= Epsilon)
   }
 
   private class ManualClock extends Clock {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1781bc0..69c88c7 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -263,7 +263,7 @@ class AsyncProducerTest extends JUnit3Suite {
     }
     catch {
       // should not throw any exception
-      case e => fail("Should not throw any exception")
+      case e: Throwable => fail("Should not throw any exception")
 
     }
   }
@@ -450,7 +450,8 @@ class AsyncProducerTest extends JUnit3Suite {
     val topic = "topic1"
     val msgs = TestUtils.getMsgStrings(5)
     val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
-    val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData)
+    import scala.collection.JavaConversions._
+    val javaProducerData: java.util.List[KeyedMessage[String, String]] = scalaProducerData
 
     val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
     mockScalaProducer.send(scalaProducerData.head)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 29331db..2cabfbb 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
       fail("Test should fail because the broker list provided are not valid")
     } catch {
       case e: FailedToSendMessageException =>
-      case oe => fail("fails with exception", oe)
+      case oe: Throwable => fail("fails with exception", oe)
     } finally {
       producer1.close()
     }
@@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     try{
       producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
-      case e => fail("Should succeed sending the message", e)
+      case e: Throwable => fail("Should succeed sending the message", e)
     } finally {
       producer2.close()
     }
@@ -134,7 +134,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     try{
       producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
-      case e => fail("Should succeed sending the message", e)
+      case e: Throwable => fail("Should succeed sending the message", e)
     } finally {
       producer3.close()
     }
@@ -191,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     }
     catch {
       case se: FailedToSendMessageException => true
-      case e => fail("Not expected", e)
+      case e: Throwable => fail("Not expected", e)
     }
     finally {
       producer2.close()
@@ -225,7 +225,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
       // on broker 0
       producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
-      case e => fail("Unexpected exception: " + e)
+      case e: Throwable => fail("Unexpected exception: " + e)
     }
 
     // kill the broker
@@ -238,7 +238,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e : TestFailedException => throw e // catch and re-throw the failure message
-      case e2 => // otherwise success
+      case e2: Throwable => // otherwise success
     }
 
     // restart server 1
@@ -287,7 +287,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
       assertEquals(new Message("test".getBytes), messageSet1.next.message)
     } catch {
-      case e => case e: Exception => producer.close; fail("Not expected", e)
+      case e: Throwable => case e: Exception => producer.close; fail("Not expected",
e)
     }
 
     // stop IO threads and request handling, but leave networking operational

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b3e89c3..3592bff 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -136,7 +136,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness
{
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new
Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
     } catch {
       case e : java.io.IOException => // success
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -205,7 +205,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness
{
       Assert.fail("Should have received timeout exception since request handling is stopped.")
     } catch {
       case e: SocketTimeoutException => /* success */
-      case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
+      case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: "
+ e)
     }
     val t2 = SystemTime.milliseconds
     // make sure we don't wait fewer than timeoutMs for a response

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 830608f..ee591d0 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -410,7 +410,7 @@ object TestUtils extends Logging {
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic,
partition),
             ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
-          case oe => error("Error while electing leader for partition [%s,%d]".format(topic,
partition), oe)
+          case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic,
partition), oe)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index 3158a22..ec3cd29 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -175,7 +175,7 @@ object ConsumerPerformance {
         case _: InterruptedException =>
         case _: ClosedByInterruptException =>
         case _: ConsumerTimeoutException =>
-        case e => throw e
+        case e: Throwable => throw e
       }
       totalMessagesRead.addAndGet(messagesRead)
       totalBytesRead.addAndGet(bytesRead)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index b3858f3..2cdbc9e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -41,7 +41,8 @@ object KafkaBuild extends Build {
   </license>
 </licenses>,
     scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"),
-    crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"),
+    crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
+    excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8"))
"*_2.9+.scala" else "*_2.8.scala"),
     scalaVersion := "2.8.0",
     version := "0.8.0-beta1",
     publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),


Mime
View raw message