kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1395729 [4/4] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/...
Date Mon, 08 Oct 2012 19:13:27 GMT
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
Mon Oct  8 19:13:24 2012
@@ -22,7 +22,7 @@ import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.message.Message
+import kafka.message.{Message, MessageSet, ByteBufferMessageSet}
 import kafka.producer.{ProducerConfig, ProducerData, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -36,20 +36,13 @@ class LogRecoveryTest extends JUnit3Suit
   val topic = "new-topic"
   val partitionId = 0
 
-  val brokerId1 = 0
-  val brokerId2 = 1
-
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
-
   var server1: KafkaServer = null
   var server2: KafkaServer = null
 
   val configProps1 = configs.head
   val configProps2 = configs.last
 
-  val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
-  val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
+  val message = new Message("hello".getBytes())
 
   var producer: Producer[Int, Message] = null
   var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir)
@@ -76,18 +69,20 @@ class LogRecoveryTest extends JUnit3Suit
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1)
== 1))
 
-    sendMessages(2)
+    val numMessages = 2L
+    sendMessages(numMessages.toInt)
 
-    // give some time for the follower 1 to record leader HW of 60
-    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
+    // give some time for the follower 1 to record leader HW
+    assertTrue("Failed to update highwatermark for follower after 1000 ms", 
+               TestUtils.waitUntilTrue(() =>
+                 server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages,
10000))
 
     servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
     producer.close()
     val leaderHW = hwFile1.read(topic, 0)
-    assertEquals(60L, leaderHW)
+    assertEquals(numMessages, leaderHW)
     val followerHW = hwFile2.read(topic, 0)
-    assertEquals(60L, followerHW)
+    assertEquals(numMessages, followerHW)
     servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)})
   }
 
@@ -110,14 +105,16 @@ class LogRecoveryTest extends JUnit3Suit
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1)
== 1))
-
+    
     assertEquals(0L, hwFile1.read(topic, 0))
 
-    sendMessages()
+    sendMessages(1)
+    Thread.sleep(1000)
+    var hw = 1L
 
     // kill the server hosting the preferred replica
     server1.shutdown()
-    assertEquals(30L, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
 
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
@@ -130,25 +127,27 @@ class LogRecoveryTest extends JUnit3Suit
     assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it
can move to broker 0",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
 
-    assertEquals(30L, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed
to disk yet
     server2.shutdown()
-    assertEquals(30L, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile2.read(topic, 0))
 
     server2.startup()
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
     assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it
can move to broker 1",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
 
-    sendMessages()
+    sendMessages(1)
+    hw += 1
+      
     // give some time for follower 1 to record leader HW of 60
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 2000))
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
-    assertEquals(60L, hwFile1.read(topic, 0))
-    assertEquals(60L, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile2.read(topic, 0))
     servers.foreach(server => Utils.rm(server.config.logDir))
   }
 
@@ -183,16 +182,17 @@ class LogRecoveryTest extends JUnit3Suit
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1)
== 1))
     sendMessages(20)
+    var hw = 20L
     // give some time for follower 1 to record leader HW of 600
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 600L, 1000))
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000))
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
     val leaderHW = hwFile1.read(topic, 0)
-    assertEquals(600L, leaderHW)
+    assertEquals(hw, leaderHW)
     val followerHW = hwFile2.read(topic, 0)
-    assertEquals(600L, followerHW)
+    assertEquals(hw, followerHW)
     servers.foreach(server => Utils.rm(server.config.logDir))
   }
 
@@ -228,43 +228,46 @@ class LogRecoveryTest extends JUnit3Suit
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1)
== 1))
 
     sendMessages(2)
+    var hw = 2L
+    
     // allow some time for the follower to get the leader HW
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000))
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
-    assertEquals(60L, hwFile1.read(topic, 0))
-    assertEquals(60L, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile2.read(topic, 0))
 
     server2.startup()
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    assertEquals(60L, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
 
     // bring the preferred replica back
     server1.startup()
 
-    assertEquals(60L, hwFile1.read(topic, 0))
-    assertEquals(60L, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile2.read(topic, 0))
 
     sendMessages(2)
+    hw += 2
+    
     // allow some time for the follower to get the leader HW
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(()
=>
-      server1.replicaManager.getReplica(topic, 0).get.highWatermark == 120L, 1000))
+      server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000))
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
-    assertEquals(120L, hwFile1.read(topic, 0))
-    assertEquals(120L, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile2.read(topic, 0))
     servers.foreach(server => Utils.rm(server.config.logDir))
   }
 
-  private def sendMessages(numMessages: Int = 1) {
-    for(i <- 0 until numMessages) {
-      producer.send(new ProducerData[Int, Message](topic, 0, sent1))
-    }
+  private def sendMessages(n: Int = 1) {
+    for(i <- 0 until n)
+      producer.send(new ProducerData[Int, Message](topic, 0, message))
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
Mon Oct  8 19:13:24 2012
@@ -60,7 +60,7 @@ class ReplicaFetchTest extends JUnit3Sui
                   new ProducerData[String, String](topic2, testMessageList2))
     producer.close()
 
-    def condition(): Boolean = {
+    def logsMatch(): Boolean = {
       var result = true
       for (topic <- List(topic1, topic2)) {
         val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
@@ -69,6 +69,6 @@ class ReplicaFetchTest extends JUnit3Sui
       }
       result
     }
-    assertTrue("broker logs should be identical", waitUntilTrue(condition, 6000))
+    assertTrue("Broker logs should be identical", waitUntilTrue(logsMatch, 6000))
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Mon Oct  8 19:13:24 2012
@@ -80,7 +80,7 @@ class ServerShutdownTest extends JUnit3S
         fetchedMessage = fetched.messageSet(topic, 0)
       }
       TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
-      val newOffset = fetchedMessage.validBytes
+      val newOffset = fetchedMessage.last.nextOffset
 
       // send some more messages
       producer.send(new ProducerData[Int, Message](topic, 0, sent2))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Mon Oct  8 19:13:24 2012
@@ -54,6 +54,7 @@ class SimpleFetchTest extends JUnit3Suit
     val time = new MockTime
     val leo = 20
     val hw = 5
+    val fetchSize = 100
     val messages = new Message("test-message".getBytes())
 
     val zkClient = EasyMock.createMock(classOf[ZkClient])
@@ -61,7 +62,7 @@ class SimpleFetchTest extends JUnit3Suit
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
-    EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages))
+    EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages))
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
@@ -92,9 +93,9 @@ class SimpleFetchTest extends JUnit3Suit
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up
to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
-      .replicaId(Request.NonFollowerId)
-      .addFetch(topic, partitionId, 0, hw*2)
-      .build()
+          .replicaId(Request.NonFollowerId)
+          .addFetch(topic, partitionId, 0, fetchSize)
+          .build()
     val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
 
     // send the request
@@ -156,7 +157,7 @@ class SimpleFetchTest extends JUnit3Suit
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
-    EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages))
+    EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages))
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon
Oct  8 19:13:24 2012
@@ -155,8 +155,8 @@ object TestUtils extends Logging {
    * Wrap the message in a message set
    * @param payload The bytes of the message
    */
-  def singleMessageSet(payload: Array[Byte]) =
-    new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(payload))
+  def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec)
=
+    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload))
 
   /**
    * Generate an array of random bytes
@@ -426,7 +426,29 @@ object TestUtils extends Logging {
       leaderLock.unlock()
     }
   }
+  
+  /**
+   * Execute the given block. If it throws an assert error, retry. Repeat
+   * until no error is thrown or the time limit ellapses
+   */
+  def retry(waitTime: Long, block: () => Unit) {
+    val startTime = System.currentTimeMillis()
+    while(true) {
+      try {
+        block()
+      } catch {
+        case e: AssertionError =>
+          if(System.currentTimeMillis - startTime > waitTime)
+            throw e
+          else
+            Thread.sleep(100)
+      }
+    }
+  }
 
+  /**
+   * Wait until the given condition is true or the given wait time ellapses
+   */
   def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
     val startTime = System.currentTimeMillis()
     while (true) {

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
Mon Oct  8 19:13:24 2012
@@ -59,7 +59,7 @@ object ConsumerPerformance {
         threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i),
config,
                                               totalMessagesRead, totalBytesRead)
 
-    logger.info("Sleeping for 1000 seconds.")
+    logger.info("Sleeping for 1 second.")
     Thread.sleep(1000)
     logger.info("starting threads")
     val startMs = System.currentTimeMillis



Mime
View raw message