kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1295861 [3/3] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/...
Date Thu, 01 Mar 2012 21:15:28 GMT
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
Thu Mar  1 21:15:26 2012
@@ -17,20 +17,21 @@
 
 package kafka.producer
 
-import junit.framework.Assert._
-import java.util.Properties
-import kafka.api.FetchRequestBuilder
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
 import kafka.consumer.SimpleConsumer
+import org.I0Itec.zkclient.ZkClient
+import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
+import java.util.Properties
+import org.apache.log4j.{Level, Logger}
+import org.junit.Test
+import kafka.utils.{TestZKUtils, Utils, TestUtils}
 import kafka.message.Message
-import kafka.serializer.Encoder
-import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
-import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.zk.EmbeddedZookeeper
-import org.apache.log4j.{Logger, Level}
-import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnitSuite
+import kafka.admin.CreateTopicCommand
+import kafka.api.FetchRequestBuilder
+import org.junit.Assert._
 
-class ProducerTest extends JUnitSuite {
+class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val topic = "test-topic"
   private val brokerId1 = 0
   private val brokerId2 = 1  
@@ -40,13 +41,13 @@ class ProducerTest extends JUnitSuite {
   private var server2: KafkaServer = null
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
-  private var zkServer:EmbeddedZookeeper = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+  private var zkClient: ZkClient = null
 
-  @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     // set up 2 brokers with 4 partitions each
-    zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
+    zkClient = zookeeper.client
 
     val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
     val config1 = new KafkaConfig(props1) {
@@ -73,8 +74,7 @@ class ProducerTest extends JUnitSuite {
     Thread.sleep(500)
   }
 
-  @After
-  def tearDown() {
+  override def tearDown() {
     // restore set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.ERROR)
     server1.shutdown
@@ -82,38 +82,43 @@ class ProducerTest extends JUnitSuite {
     Utils.rm(server1.config.logDir)
     Utils.rm(server2.config.logDir)    
     Thread.sleep(500)
-    zkServer.shutdown
-    Thread.sleep(500)
+    super.tearDown()
   }
 
   @Test
   def testZKSendToNewTopic() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
 
     val producer = new Producer[String, String](config)
     try {
-      // Available broker id, partition id at this stage should be (0,0), (1,0)
-      // this should send the message to broker 0 on partition 0
+      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get
sent to partition 0, but
+      // since partition 0 can exist on any of the two brokers, we need to fetch from both
brokers
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       Thread.sleep(100)
-      // Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3),
(1,0)
-      // Since 4 % 5 = 4, this should send the message to broker 1 on partition 0
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(100)
-      // cross check if brokers got the messages
+      Thread.sleep(1000)
+      // cross check if one of the brokers got the messages
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0)
-      assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1.head.message)
+      val messageSet1 = response1.messageSet("new-topic", 0).iterator
       val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0)
-      assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet2.head.message)
+      val messageSet2 = response2.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", (messageSet1.hasNext || messageSet2.hasNext))
+
+      if(messageSet1.hasNext) {
+        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+        assertTrue("Message set should have 1 message", messageSet1.hasNext)
+        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      }
+      else {
+        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+        assertTrue("Message set should have 1 message", messageSet2.hasNext)
+        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+      }
     } catch {
       case e: Exception => fail("Not expected", e)
     }
@@ -124,34 +129,40 @@ class ProducerTest extends JUnitSuite {
   def testZKSendWithDeadBroker() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
+    // create topic
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+
     val config = new ProducerConfig(props)
 
     val producer = new Producer[String, String](config)
     try {
-      // Available broker id, partition id at this stage should be (0,0), (1,0)
-      // Hence, this should send the message to broker 0 on partition 0
+      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get
sent to partition 0 and
+      // all partitions have broker 0 as the leader.
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       Thread.sleep(100)
       // kill 2nd broker
-      server2.shutdown
+      server1.shutdown
       Thread.sleep(100)
-      // Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3),
(1,0)
-      // Since 4 % 5 = 4, in a normal case, it would send to broker 1 on partition 0. But
since broker 1 is down,
-      // 4 % 4 = 0, So it should send the message to broker 0 on partition 0
+
+      // Since all partitions are unavailable, this request will be dropped
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       Thread.sleep(100)
+
+      // restart server 1
+      server1.startup()
+      Thread.sleep(100)
+
       // cross check if brokers got the messages
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
-      val messageSet1Iter = response1.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1Iter.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
-      assertTrue("Message set should have another message", messageSet1Iter.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
+      val messageSet1 = response1.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      assertFalse("Message set should have another message", messageSet1.hasNext)
     } catch {
-      case e: Exception => fail("Not expected")
+      case e: Exception => fail("Not expected", e)
     }
     producer.close
   }
@@ -160,7 +171,7 @@ class ProducerTest extends JUnitSuite {
   def testZKSendToExistingTopicWithNoBrokers() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
@@ -168,19 +179,19 @@ class ProducerTest extends JUnitSuite {
     val producer = new Producer[String, String](config)
     var server: KafkaServer = null
 
+    // create topic
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+
     try {
-      // shutdown server1
-      server1.shutdown
-      Thread.sleep(100)
-      // Available broker id, partition id at this stage should be (1,0)
-      // this should send the message to broker 1 on partition 0
+      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get
sent to partition 0 and
+      // all partitions have broker 0 as the leader.
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
       Thread.sleep(100)
       // cross check if brokers got the messages
-      val response1 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0)
-      assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.head.message)
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
+      val messageSet1 = response1.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet1.next.message)
 
       // shutdown server2
       server2.shutdown
@@ -189,7 +200,7 @@ class ProducerTest extends JUnitSuite {
       Utils.rm(server2.config.logDir)
       Thread.sleep(100)
       // start it up again. So broker 2 exists under /broker/ids, but nothing exists under
/broker/topics/new-topic
-      val props2 = TestUtils.createBrokerConfig(brokerId1, port1)
+      val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
       val config2 = new KafkaConfig(props2) {
         override val numPartitions = 4
       }
@@ -202,9 +213,9 @@ class ProducerTest extends JUnitSuite {
 
       // cross check if brokers got the messages
       val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0,
0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0)
-      assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet2.head.message)
+      val messageSet2 = response1.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", messageSet2.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet2.next.message)
 
     } catch {
       case e: Exception => fail("Not expected", e)
@@ -213,29 +224,5 @@ class ProducerTest extends JUnitSuite {
       producer.close
     }
   }
-
-}
-
-class StringSerializer extends Encoder[String] {
-  def toEvent(message: Message):String = message.toString
-  def toMessage(event: String):Message = new Message(event.getBytes)
-  def getTopic(event: String): String = event.concat("-topic")
-}
-
-class NegativePartitioner extends Partitioner[String] {
-  def partition(data: String, numPartitions: Int): Int = {
-    -1
-  }
-}
-
-class StaticPartitioner extends Partitioner[String] {
-  def partition(data: String, numPartitions: Int): Int = {
-    (data.length % numPartitions)
-  }
 }
 
-class HashPartitioner extends Partitioner[String] {
-  def partition(data: String, numPartitions: Int): Int = {
-    (data.hashCode % numPartitions)
-  }
-}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Thu Mar  1 21:15:26 2012
@@ -21,11 +21,11 @@ import junit.framework.Assert
 import kafka.server.KafkaConfig
 import kafka.common.MessageSizeTooLargeException
 import java.util.Properties
-import kafka.api.ProducerRequest
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.scalatest.junit.JUnit3Suite
+import kafka.api.ProducerRequest
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Thu Mar  1 21:15:26 2012
@@ -17,16 +17,17 @@
 package kafka.server
 
 import java.io.File
-import kafka.producer.{SyncProducer, SyncProducerConfig}
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
 import org.junit.Test
 import junit.framework.Assert._
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{Message, ByteBufferMessageSet}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{TestUtils, Utils}
-import kafka.api.{FetchResponse, FetchRequestBuilder, FetchRequest}
+import kafka.producer._
+import kafka.admin.CreateTopicCommand
+import kafka.api.FetchRequestBuilder
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
@@ -38,26 +39,20 @@ class ServerShutdownTest extends JUnit3S
 
     val host = "localhost"
     val topic = "test"
-    val sent1 = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()),
new Message("there".getBytes()))
-    val sent2 = new ByteBufferMessageSet(NoCompressionCodec, new Message("more".getBytes()),
new Message("messages".getBytes()))
+    val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+    val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
 
     {
-      val producer = new SyncProducer(getProducerConfig(host,
-                                                        port,
-                                                        64*1024,
-                                                        100000,
-                                                        10000))
-      val consumer = new SimpleConsumer(host,
-                                        port,
-                                        1000000,
-                                        64*1024)
-
       val server = new KafkaServer(config)
       server.startup()
 
+      // create topic
+      CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "0")
+
+      val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+
       // send some messages
-      producer.send(topic, sent1)
-      sent1.getBuffer.rewind
+      producer.send(new ProducerData[Int, Message](topic, 0, sent1))
 
       Thread.sleep(200)
       // do a clean shutdown
@@ -68,11 +63,7 @@ class ServerShutdownTest extends JUnit3S
 
 
     {
-      val producer = new SyncProducer(getProducerConfig(host,
-                                                        port,
-                                                        64*1024,
-                                                        100000,
-                                                        10000))
+      val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
       val consumer = new SimpleConsumer(host,
                                         port,
                                         1000000,
@@ -81,18 +72,19 @@ class ServerShutdownTest extends JUnit3S
       val server = new KafkaServer(config)
       server.startup()
 
-      // bring the server back again and read the messages
+      Thread.sleep(100)
+
       var fetchedMessage: ByteBufferMessageSet = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
         val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
         fetchedMessage = fetched.messageSet(topic, 0)
       }
-      TestUtils.checkEquals(sent1.iterator, fetchedMessage.iterator)
+      TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
       val newOffset = fetchedMessage.validBytes
 
       // send some more messages
-      producer.send(topic, sent2)
-      sent2.getBuffer.rewind
+      println("Sending messages to topic " + topic)
+      producer.send(new ProducerData[Int, Message](topic, 0, sent2))
 
       Thread.sleep(200)
 
@@ -101,7 +93,7 @@ class ServerShutdownTest extends JUnit3S
         val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset,
10000).build())
         fetchedMessage = fetched.messageSet(topic, 0)
       }
-      TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetchedMessage.map(m =>
m.message).iterator)
+      TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
 
       server.shutdown()
       Utils.rm(server.config.logDir)
@@ -109,14 +101,14 @@ class ServerShutdownTest extends JUnit3S
 
   }
 
-  private def getProducerConfig(host: String, port: Int, bufferSize: Int, connectTimeout:
Int,
-                                reconnectInterval: Int): SyncProducerConfig = {
+  private def getProducerConfig(bufferSize: Int, connectTimeout: Int,
+                                reconnectInterval: Int): ProducerConfig = {
     val props = new Properties()
-    props.put("host", host)
-    props.put("port", port.toString)
+    props.put("zk.connect", zkConnect)
+    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("buffer.size", bufferSize.toString)
     props.put("connect.timeout.ms", connectTimeout.toString)
     props.put("reconnect.interval", reconnectInterval.toString)
-    new SyncProducerConfig(props)
+    new ProducerConfig(props)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu
Mar  1 21:15:26 2012
@@ -32,6 +32,7 @@ import kafka.cluster.Broker
 import collection.mutable.ListBuffer
 import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import scala.collection.Map
+import kafka.serializer.Encoder
 
 /**
  * Utility functions to help with testing
@@ -134,6 +135,7 @@ object TestUtils {
     props.put("zk.sessiontimeout.ms", "400")
     props.put("zk.synctime.ms", "200")
     props.put("autocommit.interval.ms", "1000")
+    props.put("rebalance.retries.max", "4")
 
     props
   }
@@ -275,14 +277,13 @@ object TestUtils {
   /**
    * Create a producer for the given host and port
    */
-  def createProducer(host: String, port: Int): SyncProducer = {
+  def createProducer[K, V](zkConnect: String): Producer[K, V] = {
     val props = new Properties()
-    props.put("host", host)
-    props.put("port", port.toString)
+    props.put("zk.connect", zkConnect)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
-    return new SyncProducer(new SyncProducerConfig(props))
+    new Producer[K, V](new ProducerConfig(props))
   }
 
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
@@ -308,6 +309,12 @@ object TestUtils {
     brokers
   }
 
+  def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
+    val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(),
"localhost", 6667))
+    brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
+    brokers
+  }
+
   def getConsumedMessages[T](nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[T]]]):
List[T]= {
     var messages: List[T] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
@@ -335,3 +342,31 @@ object TestUtils {
 object TestZKUtils {
   val zookeeperConnect = "127.0.0.1:2182"  
 }
+
+class StringSerializer extends Encoder[String] {
+  def toEvent(message: Message):String = message.toString
+  def toMessage(event: String):Message = new Message(event.getBytes)
+  def getTopic(event: String): String = event.concat("-topic")
+}
+
+class NegativePartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    -1
+  }
+}
+
+class StaticPartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    (data.length % numPartitions)
+  }
+}
+
+class HashPartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    (data.hashCode % numPartitions)
+  }
+}
+
+class FixedValuePartitioner extends Partitioner[Int] {
+  def partition(data: Int, numPartitions: Int): Int = data
+}

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh Thu Mar  1 21:15:26
2012
@@ -68,7 +68,7 @@ readonly test_start_time="$(date +%s)"
 
 readonly num_msg_per_batch=500
 readonly batches_per_iteration=5
-readonly num_iterations=12
+readonly num_iterations=5
 
 readonly zk_source_port=2181
 readonly zk_mirror_port=2182



Mime
View raw message