kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject svn commit: r1344526 [3/3] - in /incubator/kafka/branches/0.8: ./ clients/cpp/src/ contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/ contrib/hadoop-producer/src/main/java/kaf...
Date Thu, 31 May 2012 01:51:27 GMT
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu May 31 01:51:23 2012
@@ -29,6 +29,8 @@ import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
+import joptsimple.{OptionSpec, OptionSet, OptionParser}
+
 
 /**
  * Helper functions!
@@ -250,13 +252,47 @@ object Utils extends Logging {
     else value
   }
 
-  def getLong(props: Properties, name: String, default: Long): Long =
+  def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
+    val value = buffer.getLong
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  /**
+   * Read a required long property value or throw an exception if no such property is found
+   */
+  def getLong(props: Properties, name: String): Long = {
+    if(props.containsKey(name))
+      return getLong(props, name, -1)
+    else
+      throw new IllegalArgumentException("Missing required property '" + name + "'")
+  }
+
+  /**
+   * Read an long from the properties instance
+   * @param props The properties to read from
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @return the long value
+   */
+  def getLong(props: Properties, name: String, default: Long): Long = 
     getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue))
 
+  /**
+   * Read an long from the properties instance. Throw an exception 
+   * if the value is not in the given range (inclusive)
+   * @param props The properties to read from
+   * @param name The property name
+   * @param default The default value to use if the property is not found
+   * @param range The range in which the value must fall (inclusive)
+   * @throws IllegalArgumentException If the value is not in the given range
+   * @return the long value
+   */
   def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
-    val v =
+    val v = 
       if(props.containsKey(name))
-        props.getProperty(name).toInt
+        props.getProperty(name).toLong
       else
         default
     if(v < range._1 || v > range._2)
@@ -265,14 +301,6 @@ object Utils extends Logging {
       v
   }
 
-
-  def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
-    val value = buffer.getLong
-    if(value < range._1 || value > range._2)
-      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
-    else value
-  }
-
   /**
    * Read a boolean value from the properties instance
    * @param props The properties to read from
@@ -720,6 +748,28 @@ object Utils extends Logging {
     builder.append(" }")
     builder.toString
   }
+
+  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+    for(arg <- required) {
+      if(!options.has(arg)) {
+        error("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+  }
+
+  /**
+   * Create a circular (looping) iterator over a collection.
+   * @param coll An iterable over the underlying collection.
+   * @return A circular iterator over the collection.
+   */
+  def circularIterator[T](coll: Iterable[T]) = {
+    val stream: Stream[T] =
+      for (forever <- Stream.continually(1); t <- coll) yield t
+    stream.iterator
+  }
+
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu May 31 01:51:23 2012
@@ -432,17 +432,11 @@ object ZkUtils extends Logging {
     getChildren(zkClient, dirs.consumerRegistryDir)
   }
 
-  def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = {
-    val dirs = new ZKGroupDirs(group)
-    val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
-    TopicCount.constructTopicCount(consumerId, topicCountJson)
-  }
-
   def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
     val dirs = new ZKGroupDirs(group)
     val consumersInGroup = getConsumersInGroup(zkClient, group)
     val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
-      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)))
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
     consumersInGroup.zip(topicCountMaps).toMap
   }
 
@@ -451,8 +445,8 @@ object ZkUtils extends Logging {
     val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
     val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
     for (consumer <- consumers) {
-      val topicCount = getTopicCount(zkClient, group, consumer)
-      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+      val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient)
+      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
         for (consumerThreadId <- consumerThreadIdSet)
           consumersPerTopicMap.get(topic) match {
             case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala Thu May 31 01:51:23 2012
@@ -52,7 +52,6 @@ object TestEndToEndLatency {
     
     val message = new Message("hello there beautiful".getBytes)
     var totalTime = 0.0
-    var totalSize = 0L
     for(i <- 0 until numMessages) {
       var begin = System.nanoTime
       producer.send(new ProducerData(topic, message))
@@ -62,7 +61,6 @@ object TestEndToEndLatency {
       if(i % 10000 == 0)
         println(i + "\t" + ellapsed / 1000.0 / 1000.0)
       totalTime += ellapsed
-      totalSize += received.size
     }
     println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms"
     producer.close()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Thu May 31 01:51:23 2012
@@ -56,13 +56,13 @@ object TestZKConsumerOffsets {
   }
 }
 
-private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread {
+private class ConsumerThread(stream: KafkaStream[Message]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
     println("Starting consumer thread..")
-    for (message <- stream) {
-      println("consumed: " + Utils.toString(message.payload, "UTF-8"))
+    for (messageAndMetadata <- stream) {
+      println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8"))
     }
     shutdownLatch.countDown
     println("thread shutdown !" )

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala Thu May 31 01:51:23 2012
@@ -24,7 +24,7 @@ import kafka.cluster.Partition
 
 
 class TopicCountTest extends JUnitSuite {
-
+/*
   @Test
   def testBasic() {
     val consumer = "conusmer1"
@@ -40,7 +40,7 @@ class TopicCountTest extends JUnitSuite 
     val topicCount2 = TopicCount.constructTopicCount(consumer, expectedTopicCount.toJsonString)
     assertTrue(expectedTopicCount == topicCount2)
   }
-
+*/
   @Test
   def testPartition() {
     assertTrue(new Partition("foo", 10) == new Partition("foo", 10))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Thu May 31 01:51:23 2012
@@ -338,7 +338,7 @@ class ZookeeperConsumerConnectorTest ext
         val iterator = messageStream.iterator
         for (i <- 0 until nMessages * 2) {
           assertTrue(iterator.hasNext())
-          val message = iterator.next()
+          val message = iterator.next().message
           receivedMessages ::= message
           debug("received message: " + message)
         }
@@ -426,14 +426,14 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= {
     var messages: List[Message] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator
         for (i <- 0 until nMessagesPerThread) {
           assertTrue(iterator.hasNext)
-          val message = iterator.next
+          val message = iterator.next.message
           messages ::= message
           debug("received message: " + Utils.toString(message.payload, "UTF-8"))
         }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Thu May 31 01:51:23 2012
@@ -24,8 +24,6 @@ import kafka.api.{FetchRequestBuilder, O
 import kafka.consumer.SimpleConsumer
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-
-import org.apache.log4j.Logger
 import org.scalatest.junit.JUnit3Suite
 
 class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -45,8 +43,6 @@ class BackwardsCompatibilityTest extends
   val configs = List(new KafkaConfig(kafkaProps))
   var simpleConsumer: SimpleConsumer = null
 
-  private val logger = Logger.getLogger(getClass())
-
   override def setUp() {
     super.setUp()
     simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Thu May 31 01:51:23 2012
@@ -56,7 +56,7 @@ class FetcherTest extends JUnit3Suite wi
     super.setUp
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
     fetcher.stopConnectionsToAllBrokers
-    fetcher.startConnections(topicInfos, cluster, null)
+    fetcher.startConnections(topicInfos, cluster)
   }
 
   override def tearDown() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Thu May 31 01:51:23 2012
@@ -22,14 +22,15 @@ import kafka.integration.KafkaServerTest
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
-import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.javaapi.producer.{ProducerData, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.{Utils, Logging, TestUtils}
+import kafka.consumer.{KafkaStream, ConsumerConfig}
+import kafka.zk.ZooKeeperTestHarness
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
 
   val zookeeperConnect = zkConnect
   val numNodes = 2
@@ -93,7 +94,7 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]])
+  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]])
   : List[Message]= {
     var messages: List[Message] = Nil
     val topicMessageStreams = asMap(jTopicMessageStreams)
@@ -102,7 +103,7 @@ class ZookeeperConsumerConnectorTest ext
         val iterator = messageStream.iterator
         for (i <- 0 until nMessagesPerThread) {
           assertTrue(iterator.hasNext)
-          val message = iterator.next
+          val message = iterator.next.message
           messages ::= message
           debug("received message: " + Utils.toString(message.payload, "UTF-8"))
         }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Thu May 31 01:51:23 2012
@@ -43,6 +43,7 @@ class LogManagerTest extends JUnit3Suite
     val props = TestUtils.createBrokerConfig(0, -1)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024
+                   override val flushInterval = 100
                  }
     logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
@@ -86,10 +87,13 @@ class LogManagerTest extends JUnit3Suite
       offset += set.sizeInBytes
     }
     log.flush
-    // Why this sleep is required ? File system takes some time to update the last modified time for a file.
-    // TODO: What is unknown is why 1 second or couple 100 milliseconds didn't work ?
-    Thread.sleep(2000)
+
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
+
+    // update the last modified time of all log segments
+    val logSegments = log.segments.view
+    logSegments.foreach(s => s.file.setLastModified(time.currentMs))
+
     time.currentMs += maxLogAge + 3000
     logManager.cleanupLogs()
     assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
@@ -114,8 +118,9 @@ class LogManagerTest extends JUnit3Suite
     Thread.sleep(100)
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
-      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
+      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over
       override val logRetentionHours = retentionHours
+      override val flushInterval = 100
     }
     logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false)
     logManager.startup
@@ -182,6 +187,7 @@ class LogManagerTest extends JUnit3Suite
     config = new KafkaConfig(props) {
                    override val logFileSize = 256
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
+                   override val flushInterval = 100
                  }
 
     logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Thu May 31 01:51:23 2012
@@ -182,7 +182,10 @@ class LogTest extends JUnitSuite {
       assertEquals(curOffset, log.nextAppendOffset)
 
       // time goes by; the log file (which is empty) is deleted again
-      log.markDeletedWhile(_ => true)
+      val deletedSegments = log.markDeletedWhile(_ => true)
+
+      // we shouldn't delete the last empty log segment.
+      assertTrue(deletedSegments.size == 0)
 
       // we now have a new log
       assertEquals(curOffset, log.nextAppendOffset)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu May 31 01:51:23 2012
@@ -19,19 +19,19 @@ package kafka.log4j
 
 import java.util.Properties
 import java.io.File
+import kafka.consumer.SimpleConsumer
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
-import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
-import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils._
 
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Thu May 31 01:51:23 2012
@@ -94,6 +94,10 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+
+      //make sure shallow iterator is the same as deep iterator
+      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
+                                     TestUtils.getMessageIterator(messageSet.iterator))
     }
 
     // test for compressed regular messages
@@ -104,6 +108,8 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+
+      verifyShallowIterator(messageSet)
     }
 
     // test for mixed empty and non-empty messagesets uncompressed
@@ -121,6 +127,10 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+
+      //make sure shallow iterator is the same as deep iterator
+      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
+                                     TestUtils.getMessageIterator(mixedMessageSet.iterator))
     }
 
     // test for mixed empty and non-empty messagesets compressed
@@ -138,7 +148,15 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+
+      verifyShallowIterator(mixedMessageSet)
     }
   }
 
+  def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
+      //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
+      val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
+      val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
+      assertTrue(shallowOffsets.subsetOf(deepOffsets))
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Thu May 31 01:51:23 2012
@@ -20,10 +20,10 @@ package kafka.network;
 import java.net._
 import java.io._
 import org.junit._
-import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import kafka.utils.TestUtils
 import java.util.Random
+import junit.framework.Assert._
 
 class SocketServerTest extends JUnitSuite {
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu May 31 01:51:23 2012
@@ -23,7 +23,7 @@ import junit.framework.Assert
 import kafka.admin.CreateTopicCommand
 import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
 import kafka.integration.KafkaServerTestHarness
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
@@ -71,7 +71,7 @@ class SyncProducerTest extends JUnit3Sui
   }
 
   @Test
-  def testMessageSizeTooLarge() {
+  def testSingleMessageSizeTooLarge() {
     val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
@@ -91,6 +91,26 @@ class SyncProducerTest extends JUnit3Sui
   }
 
   @Test
+  def testCompressedMessageSizeTooLarge() {
+    val server = servers.head
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", server.socketServer.port.toString)
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    props.put("max.message.size", "100")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val bytes = new Array[Byte](101)
+    try {
+      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = new Message(bytes))))
+      Assert.fail("Message was too large to send, SyncProducer should have thrown exception for DefaultCompressionCodec.")
+    } catch {
+      case e: MessageSizeTooLargeException => /* success */
+    }
+  }
+
+  @Test
   def testProduceCorrectlyReceivesResponse() {
     val server = servers.head
     val props = new Properties()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu May 31 01:51:23 2012
@@ -20,6 +20,7 @@ import java.io.File
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
 import org.junit.Test
+import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
 import kafka.message.{Message, ByteBufferMessageSet}
 import org.scalatest.junit.JUnit3Suite

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu May 31 01:51:23 2012
@@ -30,7 +30,7 @@ import kafka.message._
 import org.I0Itec.zkclient.ZkClient
 import kafka.cluster.Broker
 import collection.mutable.ListBuffer
-import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
+import kafka.consumer.ConsumerConfig
 import scala.collection.Map
 import kafka.serializer.Encoder
 import kafka.api.{ProducerRequest, TopicData, PartitionData}
@@ -318,21 +318,6 @@ object TestUtils extends Logging {
     brokers
   }
 
-  def getConsumedMessages[T](nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[T]]]): List[T]= {
-    var messages: List[T] = Nil
-    for ((topic, messageStreams) <- topicMessageStreams) {
-      for (messageStream <- messageStreams) {
-        val iterator = messageStream.iterator
-        for (i <- 0 until nMessagesPerThread) {
-          assertTrue(iterator.hasNext)
-          val message = iterator.next
-          messages ::= message
-        }
-      }
-    }
-    messages
-  }
-
   def getMsgStrings(n: Int): Seq[String] = {
     val buffer = new ListBuffer[String]
     for (i <- 0 until  n)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Thu May 31 01:51:23 2012
@@ -20,6 +20,8 @@ package kafka.utils
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
+import org.junit.Assert._
+
 
 class UtilsTest extends JUnitSuite {
   
@@ -29,5 +31,24 @@ class UtilsTest extends JUnitSuite {
   def testSwallow() {
     Utils.swallow(logger.info, throw new IllegalStateException("test"))
   }
-  
+
+  @Test
+  def testCircularIterator() {
+    val l = List(1, 2)
+    val itl = Utils.circularIterator(l)
+    assertEquals(1, itl.next())
+    assertEquals(2, itl.next())
+    assertEquals(1, itl.next())
+    assertEquals(2, itl.next())
+    assertFalse(itl.hasDefiniteSize)
+
+    val s = Set(1, 2)
+    val its = Utils.circularIterator(s)
+    assertEquals(1, its.next())
+    assertEquals(2, its.next())
+    assertEquals(1, its.next())
+    assertEquals(2, its.next())
+    assertEquals(1, its.next())
+  }
+
 }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java Thu May 31 01:51:23 2012
@@ -16,16 +16,17 @@
  */
 package kafka.examples;
 
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaMessageStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.Message;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.Message;
+
 
 public class Consumer extends Thread
 {
@@ -55,10 +56,10 @@ public class Consumer extends Thread
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(1));
-    Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
+    Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaStream<Message> stream =  consumerMap.get(topic).get(0);
     ConsumerIterator<Message> it = stream.iterator();
     while(it.hasNext())
-      System.out.println(ExampleUtils.getMessage(it.next()));
+      System.out.println(ExampleUtils.getMessage(it.next().message()));
   }
 }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/ExampleUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/ExampleUtils.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/ExampleUtils.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/ExampleUtils.java Thu May 31 01:51:23 2012
@@ -16,8 +16,8 @@
  */
 package kafka.examples;
 
-import java.nio.ByteBuffer;
 
+import java.nio.ByteBuffer;
 import kafka.message.Message;
 
 public class ExampleUtils

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java Thu May 31 01:51:23 2012
@@ -16,9 +16,10 @@
  */
 package kafka.examples;
 
+
+import java.util.Properties;
 import kafka.javaapi.producer.ProducerData;
 import kafka.producer.ProducerConfig;
-import java.util.Properties;
 
 public class Producer extends Thread
 {

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java Thu May 31 01:51:23 2012
@@ -19,17 +19,14 @@ package kafka.examples;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.javaapi.FetchResponse;
+import java.util.ArrayList;
+import java.util.List;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.message.MessageSet;
 import kafka.message.MessageAndOffset;
-
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-
 public class SimpleConsumerDemo {
     
   private static void printMessages(ByteBufferMessageSet messageSet) {

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala Thu May 31 01:51:23 2012
@@ -17,15 +17,12 @@
 
 package kafka.perf
 
-import java.net.URI
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
-import joptsimple._
 import org.apache.log4j.Logger
 import kafka.message.Message
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, Utils}
+import kafka.utils.Utils
 import java.util.{Random, Properties}
 import kafka.consumer._
 import java.text.SimpleDateFormat
@@ -139,7 +136,7 @@ object ConsumerPerformance {
     val hideHeader = options.has(hideHeaderOpt)
   }
 
-  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaMessageStream[Message],
+  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
                            config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
     extends Thread(name) {
     private val shutdownLatch = new CountDownLatch(1)
@@ -157,9 +154,9 @@ object ConsumerPerformance {
       var lastMessagesRead = 0L
 
       try {
-        for (message <- stream if messagesRead < config.numMessages) {
+        for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
           messagesRead += 1
-          bytesRead += message.payloadSize
+          bytesRead += messageAndMetadata.message.payloadSize
 
           if (messagesRead % config.reportingInterval == 0) {
             if(config.showDetailedStats)

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala Thu May 31 01:51:23 2012
@@ -18,7 +18,7 @@
 package kafka.perf
 
 import joptsimple.OptionParser
-import java.text.SimpleDateFormat
+
 
 class PerfConfig(args: Array[String]) {
   val parser = new OptionParser

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Thu May 31 01:51:23 2012
@@ -20,18 +20,16 @@ package kafka.perf
 import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.producer._
-import async.DefaultEventHandler
 import org.apache.log4j.Logger
-import joptsimple.OptionParser
 import kafka.message.{CompressionCodec, Message}
-import kafka.serializer.DefaultEncoder
 import java.text.SimpleDateFormat
-import java.util.{Date, Random, Properties}
+import java.util.{Random, Properties}
+import kafka.utils.Logging
 
 /**
  * Load test for the producer
  */
-object ProducerPerformance {
+object ProducerPerformance extends Logging {
 
   def main(args: Array[String]) {
 
@@ -141,7 +139,6 @@ object ProducerPerformance {
                        val totalMessagesSent: AtomicLong,
                        val allDone: CountDownLatch,
                        val rand: Random) extends Runnable {
-    val logger = Logger.getLogger(getClass)
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
     if (brokerInfoList(0) == "zk.connect") {
@@ -171,7 +168,7 @@ object ProducerPerformance {
       var lastReportTime = reportTime
       val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
                               else config.numMessages / config.numThreads
-      if(logger.isDebugEnabled) logger.debug("Messages per thread = " + messagesPerThread)
+      debug("Messages per thread = " + messagesPerThread)
       var messageSet: List[Message] = Nil
       if(config.isFixSize) {
         for(k <- 0 until config.batchSize) {
@@ -203,11 +200,11 @@ object ProducerPerformance {
               rand.nextBytes(messageBytes)
               val message = new Message(messageBytes)
               producer.send(new ProducerData[Message,Message](config.topic, message))
-              if(logger.isDebugEnabled) println("checksum:" + message.checksum)
+              debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
             }else {
               producer.send(new ProducerData[Message,Message](config.topic, message))
-              if(logger.isDebugEnabled) println("checksum:" + message.checksum)
+              debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
             }
             nSends += 1

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh Thu May 31 01:51:23 2012
@@ -73,7 +73,9 @@ readonly num_iterations=5
 readonly zk_source_port=2181
 readonly zk_mirror_port=2182
 
-readonly topic_1=test01
+readonly topic_prefix=test
+readonly max_topic_id=2
+readonly unbalanced_start_id=2
 readonly consumer_grp=group1
 readonly source_console_consumer_grp=source
 readonly mirror_console_consumer_grp=mirror
@@ -96,10 +98,16 @@ readonly num_kafka_target_server=3
 readonly wait_time_after_killing_broker=0
 readonly wait_time_after_restarting_broker=5
 
-background_producer_pid=
+readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094"
+readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093"
+
+background_producer_pid_1=
+background_producer_pid_2=
+
 no_bouncing=$#
 
 iter=1
+abort_test=false
 
 pid_zk_source=
 pid_zk_target=
@@ -177,17 +185,29 @@ get_random_range() {
 
 verify_consumer_rebalancing() {
 
-   info "Verifying consumer rebalancing operation"
+    info "Verifying consumer rebalancing operation"
 
-    $base_dir/bin/kafka-run-class.sh \
-        kafka.tools.VerifyConsumerRebalance \
-        --zk.connect=localhost:2181 \
-        --group $consumer_grp \
-     2>&1 >> $consumer_rebalancing_log
+    CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \
+                                 kafka.tools.VerifyConsumerRebalance \
+                                 --zk.connect=localhost:2181 \
+                                 --group $consumer_grp`
+    echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log
+
+    REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1`
+    # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE"
+    REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"`
+    info "REBALANCE_STATUS: $REBALANCE_STATUS"
+
+    if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then
+        info "setting abort_test to true due to Rebalance operation failed"
+        abort_test="true"
+    fi
 }
 
 wait_for_zero_consumer_lags() {
 
+    topic_id=$1
+
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
 
@@ -196,7 +216,7 @@ wait_for_zero_consumer_lags() {
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
                        --group $consumer_grp --zkconnect localhost:$zk_source_port \
-                       --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -217,6 +237,8 @@ wait_for_zero_consumer_lags() {
 
 wait_for_zero_source_console_consumer_lags() {
 
+    topic_id=$1
+
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
 
@@ -225,7 +247,7 @@ wait_for_zero_source_console_consumer_la
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
                        --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \
-                       --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -246,6 +268,8 @@ wait_for_zero_source_console_consumer_la
 
 wait_for_zero_mirror_console_consumer_lags() {
 
+    topic_id=$1
+
     # no of times to check for zero lagging
     no_of_zero_to_verify=3
 
@@ -254,7 +278,7 @@ wait_for_zero_mirror_console_consumer_la
         TOTAL_LAG=0
         CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
                        --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \
-                       --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
+                       --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='`
 
         for lag in $CONSUMER_LAGS;
         do
@@ -321,6 +345,8 @@ cleanup() {
     rm -f $console_consumer_source_crc_sorted_log
     rm -f $console_consumer_mirror_crc_sorted_uniq_log
     rm -f $console_consumer_source_crc_sorted_uniq_log
+
+    rm -f $consumer_rebalancing_log
 }
 
 start_zk() {
@@ -380,40 +406,65 @@ start_embedded_consumer_server() {
 }
 
 start_console_consumer_for_source_producer() {
-    info "starting console consumers for source producer"
+
+    topic_id=$1
+
+    info "starting console consumers for source producer on topic id [$topic_id]"
 
     $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
         --zookeeper localhost:$zk_source_port \
-        --topic $topic_1 \
+        --topic ${topic_prefix}_${topic_id} \
         --group $source_console_consumer_grp \
-        --from-beginning \
+        --from-beginning --consumer-timeout-ms 5000 \
         --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
-        2>&1 > ${console_consumer_source_log} &
-    console_consumer_source_pid=$!
-
-    info "  -> console consumer source pid: $console_consumer_source_pid"
+        --property topic=${topic_prefix}_${topic_id} \
+        2>&1 >> ${console_consumer_source_log} 
 }
 
 start_console_consumer_for_mirror_producer() {
-    info "starting console consumers for mirroring producer"
+
+    topic_id=$1
+
+    info "starting console consumers for mirroring producer on topic id [$topic_id]"
 
     $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
         --zookeeper localhost:$zk_mirror_port \
-        --topic $topic_1 \
+        --topic ${topic_prefix}_${topic_id} \
         --group $mirror_console_consumer_grp \
-        --from-beginning \
+        --from-beginning --consumer-timeout-ms 5000 \
         --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \
-        2>&1 > ${console_consumer_mirror_log} &
-    console_consumer_mirror_pid=$!
+        --property topic=${topic_prefix}_${topic_id} \
+        2>&1 >> ${console_consumer_mirror_log} 
+}
 
-    info "  -> console consumer mirror pid: $console_consumer_mirror_pid"
+consume_source_producer_messages() {
+    consumer_counter=1
+    while [ $consumer_counter -le $max_topic_id ]
+    do
+        start_console_consumer_for_source_producer $consumer_counter
+        consumer_counter=$(( $consumer_counter + 1 ))
+    done
+}
+
+consume_mirror_producer_messages() {
+    consumer_counter=1
+    while [ $consumer_counter -le $max_topic_id ]
+    do
+        start_console_consumer_for_mirror_producer $consumer_counter
+        consumer_counter=$(( $consumer_counter + 1 ))
+    done
 }
 
 shutdown_producer() {
     info "shutting down producer"
-    if [ "x${background_producer_pid}" != "x" ]; then
-        # kill_child_processes 0 ${background_producer_pid};
-        kill -TERM ${background_producer_pid} 2> /dev/null;
+    if [ "x${background_producer_pid_1}" != "x" ]; then
+        # kill_child_processes 0 ${background_producer_pid_1};
+        kill -TERM ${background_producer_pid_1} 2> /dev/null;
+    fi
+
+    if [ "x${background_producer_pid_2}" != "x" ]; then
+        # kill_child_processes 0 ${background_producer_pid_2};
+        kill -TERM ${background_producer_pid_2} 2> /dev/null;
     fi
 }
 
@@ -450,13 +501,15 @@ shutdown_servers() {
 }
 
 start_background_producer() {
+    bkrinfo_str=$1
+    start_topic_id=$2
+    end_topic_id=$3
 
     batch_no=0
-    curr_iter=0
+    topic_id=${start_topic_id}
 
-    while [ $num_iterations -gt $curr_iter ]
+    while [ 'x' == 'x' ]
     do
-        topic=$1
         sleeptime=
 
         get_random_range $sleep_min $sleep_max
@@ -464,19 +517,24 @@ start_background_producer() {
 
         batch_no=$(($batch_no + 1))
 
+        if [ $topic_id -gt $end_topic_id ]; then
+            topic_id=${start_topic_id}
+        fi
+
         $base_dir/bin/kafka-run-class.sh \
             kafka.perf.ProducerPerformance \
-            --brokerinfo zk.connect=localhost:2181 \
-            --topic $topic \
+            --brokerinfo $bkrinfo_str \
+            --topic ${topic_prefix}_${topic_id} \
             --messages $num_msg_per_batch \
             --message-size $message_size \
             --batch-size 50 \
             --vary-message-size \
             --threads 1 \
-            --reporting-interval $num_msg_per_batch \
-            --async \
+            --reporting-interval $num_msg_per_batch --async \
             2>&1 >> $base_dir/producer_performance.log    # appending all producers' msgs
 
+        topic_id=$(( $topic_id + 1 ))
+
         sleep $sleeptime
     done
 }
@@ -485,9 +543,9 @@ cmp_checksum() {
 
     cmp_result=0
 
-    grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
-    grep ^checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
-    grep ^checksum $producer_performance_log | tr -d ' ' | cut -f2 -d ':' > $producer_performance_crc_log
+    grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log
+    grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log
+    grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log
 
     sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log
     sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log
@@ -555,6 +613,37 @@ cmp_checksum() {
     echo "========================================================" >> $checksum_diff_log
     echo "${duplicate_mirror_crc}"                                  >> $checksum_diff_log
 
+    topic_chksum_counter=1
+    while [ $topic_chksum_counter -le $max_topic_id ]
+    do
+        # get producer topic counts
+        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log`
+        echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+        topic_chksum_counter=$(($topic_chksum_counter + 1))
+    done
+    echo
+
+    topic_chksum_counter=1
+    while [ $topic_chksum_counter -le $max_topic_id ]
+    do
+        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log`
+        echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+        topic_chksum_counter=$(($topic_chksum_counter + 1))
+    done
+    echo
+
+    topic_chksum_counter=1
+    while [ $topic_chksum_counter -le $max_topic_id ]
+    do
+        this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log`
+        echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}"
+
+        topic_chksum_counter=$(($topic_chksum_counter + 1))
+    done
+    echo
+
     return $cmp_result
 }
 
@@ -567,15 +656,32 @@ start_test() {
     start_target_servers_cluster
     sleep 2
 
-    start_background_producer $topic_1 &
-    background_producer_pid=$!
+    start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) &
+    background_producer_pid_1=$!
 
     info "=========================================="
-    info "Started background producer pid [${background_producer_pid}]"
+    info "Started background producer pid [${background_producer_pid_1}]"
     info "=========================================="
 
-    sleep 5
-    
+    sleep 10
+   
+    start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id &
+    background_producer_pid_2=$!
+
+    info "=========================================="
+    info "Started background producer pid [${background_producer_pid_2}]"
+    info "=========================================="
+
+    sleep 10
+
+    verify_consumer_rebalancing
+
+    info "abort_test: [${abort_test}]"
+    if [ "${abort_test}_x" == "true_x" ]; then
+        info "aborting test"
+        iter=$((${num_iterations} + 1))
+    fi
+ 
     while [ $num_iterations -ge $iter ]
     do
         echo
@@ -592,7 +698,6 @@ start_test() {
                 # even iterations -> bounce target kafka borker
                 get_random_range 1 $num_kafka_target_server 
                 idx=$?
-
                 if [ "x${kafka_target_pids[$idx]}" != "x" ]; then
                     echo
                     info "#### Bouncing kafka TARGET broker ####"
@@ -631,7 +736,15 @@ start_test() {
                     sleep $wait_time_after_restarting_broker
                 fi
             fi
+
             verify_consumer_rebalancing
+
+            info "abort_test: [${abort_test}]"
+            if [ "${abort_test}_x" == "true_x" ]; then
+                info "aborting test"
+                iter=$((${num_iterations} + 1))
+            fi
+
         else
             info "No bouncing performed"
         fi
@@ -670,8 +783,8 @@ trap "shutdown_producer; shutdown_server
 
 start_test
 
-start_console_consumer_for_source_producer
-start_console_consumer_for_mirror_producer
+consume_source_producer_messages
+consume_mirror_producer_messages
 
 wait_for_zero_source_console_consumer_lags
 wait_for_zero_mirror_console_consumer_lags

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties Thu May 31 01:51:23 2012
@@ -12,30 +12,74 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=INFO, stdout
 
+log4j.rootLogger=INFO, stdout, kafkaAppender
+
+# ====================================
+# messages going to kafkaAppender
+# ====================================
+log4j.logger.kafka=DEBUG, kafkaAppender
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, kafkaAppender
+log4j.logger.org.apache.zookeeper=INFO, kafkaAppender
+
+# ====================================
+# messages going to zookeeperAppender
+# ====================================
+# (comment out this line to redirect ZK-related messages to kafkaAppender
+#  to allow reading both Kafka and ZK debugging messages in a single file)
+#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
+
+# ====================================
+# stdout
+# ====================================
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
 
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
-
-
-# Turn on all our debugging info
-#log4j.logger.kafka=INFO
-log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
-log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.kafka.consumer=DEBUG
-log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
-log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
+# ====================================
+# fileAppender
+# ====================================
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=/tmp/kafka_all_request.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# ====================================
+# kafkaAppender
+# ====================================
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.File=/tmp/kafka.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.additivity.kafka=true
+
+# ====================================
+# zookeeperAppender
+# ====================================
+log4j.appender.zookeeperAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.zookeeperAppender.File=/tmp/zookeeper.log
+log4j.appender.zookeeperAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.zookeeperAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.additivity.org.apache.zookeeper=false
+
+# ====================================
+# other available debugging info 
+# ====================================
+#log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
+#log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
 #log4j.logger.kafka.producer.async.AsyncProducer=TRACE
 #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE
-log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+#log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+
+log4j.logger.kafka.consumer=DEBUG
 log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
 
 # to print message checksum from ProducerPerformance
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG 
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+
+# to print socket buffer size validated by Kafka broker
+log4j.logger.kafka.network.Acceptor=DEBUG
+
+# to print socket buffer size validated by SimpleConsumer
+log4j.logger.kafka.consumer.SimpleConsumer=TRACE
 

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties Thu May 31 01:51:23 2012
@@ -15,7 +15,7 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9093
+broker.list=0:localhost:9081
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties Thu May 31 01:51:23 2012
@@ -15,7 +15,7 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9094
+broker.list=0:localhost:9082
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties Thu May 31 01:51:23 2012
@@ -15,7 +15,7 @@
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-broker.list=0:localhost:9095
+broker.list=0:localhost:9083
 
 # timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=1
 num.partitions=1
 
 # the port the socket server runs on
-port=9092
+port=9091
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=2
 num.partitions=1
 
 # the port the socket server runs on
-port=9091
+port=9092
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=3
 num.partitions=1
 
 # the port the socket server runs on
-port=9090
+port=9093
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=4
 num.partitions=1
 
 # the port the socket server runs on
-port=9096
+port=9094
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=1
 num.partitions=1
 
 # the port the socket server runs on
-port=9093
+port=9081
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=2
 num.partitions=1
 
 # the port the socket server runs on
-port=9094
+port=9082
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties Thu May 31 01:51:23 2012
@@ -26,7 +26,7 @@ brokerid=3
 num.partitions=1
 
 # the port the socket server runs on
-port=9095
+port=9083
 
 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine
 num.threads=8

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/config/whitelisttest.consumer.properties?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/config/whitelisttest.consumer.properties (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/config/whitelisttest.consumer.properties Thu May 31 01:51:23 2012
@@ -25,5 +25,5 @@ zk.connectiontimeout.ms=1000000
 #consumer group id
 groupid=group1
 
-mirror.topics.whitelist=test01
-
+mirror.topics.whitelist=test_1,test_2
+autooffset.reset=smallest



Mime
View raw message