kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1310645 [2/2] - in /incubator/kafka/trunk: contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/src/main/java/kafka/bridge/examples/ contrib/hadoop-producer/src/main...
Date Sat, 07 Apr 2012 00:04:54 GMT
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Sat Apr  7 00:04:51 2012
@@ -17,36 +17,21 @@
 
 package kafka.server
 
-import kafka.utils.{Utils, Logging}
-import kafka.consumer._
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
-import kafka.message.Message
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.Map
-
-class KafkaServerStartable(val serverConfig: KafkaConfig,
-                           val consumerConfig: ConsumerConfig,
-                           val producerConfig: ProducerConfig) extends Logging {
+import kafka.utils.Logging
+
+
+class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   private var server : KafkaServer = null
-  private var embeddedConsumer : EmbeddedConsumer = null
 
   init
 
-  def this(serverConfig: KafkaConfig) = this(serverConfig, null, null)
-
   private def init() {
     server = new KafkaServer(serverConfig)
-    if (consumerConfig != null)
-      embeddedConsumer =
-        new EmbeddedConsumer(consumerConfig, producerConfig, this)
   }
 
   def startup() {
     try {
       server.startup()
-      if (embeddedConsumer != null)
-        embeddedConsumer.startup()
     }
     catch {
       case e =>
@@ -57,8 +42,6 @@ class KafkaServerStartable(val serverCon
 
   def shutdown() {
     try {
-      if (embeddedConsumer != null)
-        embeddedConsumer.shutdown()
       server.shutdown()
     }
     catch {
@@ -73,153 +56,4 @@ class KafkaServerStartable(val serverCon
   }
 }
 
-class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
-                       private val producerConfig: ProducerConfig,
-                       private val kafkaServerStartable: KafkaServerStartable) extends TopicEventHandler[String] with Logging {
-
-  private val whiteListTopics =
-    consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
-
-  private val blackListTopics =
-    consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
-
-  // mirrorTopics should be accessed by handleTopicEvent only
-  private var mirrorTopics:Seq[String] = List()
-
-  private var consumerConnector: ConsumerConnector = null
-  private var topicEventWatcher:ZookeeperTopicEventWatcher = null
-
-  private val producer = new Producer[Null, Message](producerConfig)
-
-  var threadList = List[MirroringThread]()
-
-  private def isTopicAllowed(topic: String) = {
-    if (consumerConfig.mirrorTopicsWhitelist.nonEmpty)
-      whiteListTopics.contains(topic)
-    else
-      !blackListTopics.contains(topic)
-  }
-
-  // TopicEventHandler call-back only
-  @Override
-  def handleTopicEvent(allTopics: Seq[String]) {
-    val newMirrorTopics = allTopics.filter(isTopicAllowed)
-
-    val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
-    if (addedTopics.nonEmpty)
-      info("topic event: added topics = %s".format(addedTopics))
-
-    val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
-    if (deletedTopics.nonEmpty)
-      info("topic event: deleted topics = %s".format(deletedTopics))
-
-    mirrorTopics = newMirrorTopics
-
-    if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
-      info("mirror topics = %s".format(mirrorTopics))
-      startNewConsumerThreads(makeTopicMap(mirrorTopics))
-    }
-  }
-
-  private def makeTopicMap(mirrorTopics: Seq[String]) = {
-    if (mirrorTopics.nonEmpty)
-      Utils.getConsumerTopicMap(mirrorTopics.mkString(
-        "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads),
-        ":%d".format(consumerConfig.mirrorConsumerNumThreads)))
-    else
-      Utils.getConsumerTopicMap("")
-  }
-
-  private def startNewConsumerThreads(topicMap: Map[String, Int]) {
-    if (topicMap.nonEmpty) {
-      if (consumerConnector != null)
-        consumerConnector.shutdown()
-
-      /**
-       * Before starting new consumer threads for the updated set of topics,
-       * shutdown the existing mirroring threads. Since the consumer connector
-       * is already shutdown, the mirroring threads should finish their task almost
-       * instantaneously. If they don't, this points to an error that needs to be looked
-       * into, and further mirroring should stop
-       */
-      threadList.foreach(_.shutdown)
-
-      // KAFKA: 212: clear the thread list to remove the older thread references that are already shutdown
-      threadList = Nil
-
-      consumerConnector = Consumer.create(consumerConfig)
-      val topicMessageStreams =  consumerConnector.createMessageStreams(topicMap)
-      for ((topic, streamList) <- topicMessageStreams)
-        for (i <- 0 until streamList.length)
-          threadList ::= new MirroringThread(streamList(i), topic, i)
-
-      threadList.foreach(_.start)
-    }
-    else
-      info("Not starting mirroring threads (mirror topic list is empty)")
-  }
-
-  def startup() {
-    info("staring up embedded consumer")
-    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable)
-    /*
-    * consumer threads are (re-)started upon topic events (which includes an
-    * initial startup event which lists the current topics)
-    */
-  }
-
-  def shutdown() {
-    // first shutdown the topic watcher to prevent creating new consumer streams
-    if (topicEventWatcher != null)
-      topicEventWatcher.shutdown()
-    info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers")
-    // stop pulling more data for mirroring
-    if (consumerConnector != null)
-      consumerConnector.shutdown()
-    info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads")
-    // wait for all mirroring threads to stop
-    threadList.foreach(_.shutdown)
-    info("Stopped all existing mirroring threads, now stopping the producer")
-    // only then, shutdown the producer
-    producer.close()
-    info("Successfully shutdown this Kafka mirror")
-  }
-
-  class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread with Logging {
-    val shutdownComplete = new CountDownLatch(1)
-    val name = "kafka-embedded-consumer-%s-%d".format(topic, threadId)
-    this.setDaemon(false)
-    this.setName(name)
-
-
-    override def run = {
-      info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
-
-      try {
-        for (message <- stream) {
-          trace("Mirroring thread received message " + message.checksum)
-          val pd = new ProducerData[Null, Message](topic, message)
-          producer.send(pd)
-        }
-      }
-      catch {
-        case e =>
-          fatal(topic + " stream " + threadId + " unexpectedly exited", e)
-      }finally {
-        shutdownComplete.countDown
-        info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
-      }
-    }
-
-    def shutdown = {
-      try {
-        shutdownComplete.await
-      }catch {
-        case e: InterruptedException => fatal("Shutdown of thread " + name + " interrupted. " +
-          "Mirroring thread might leak data!")
-      }
-    }
-  }
-}
-
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala Sat Apr  7 00:04:51 2012
@@ -82,15 +82,15 @@ object ConsumerShell {
   }
 }
 
-class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread with Logging {
+class ZKConsumerThread(stream: KafkaStream[String]) extends Thread with Logging {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
     println("Starting consumer thread..")
     var count: Int = 0
     try {
-      for (message <- stream) {
-        println("consumed: " + message)
+      for (messageAndMetadata <- stream) {
+        println("consumed: " + messageAndMetadata.message)
         count += 1
       }
     }catch {

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala Sat Apr  7 00:04:51 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.message.Message
+import joptsimple.OptionParser
+import kafka.utils.{Utils, Logging}
+import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import scala.collection.JavaConversions._
+import java.util.concurrent.CountDownLatch
+import kafka.consumer._
+
+
+object MirrorMaker extends Logging {
+
+  def main(args: Array[String]) {
+    
+    info ("Starting mirror maker")
+    val parser = new OptionParser
+
+    val consumerConfigOpt = parser.accepts("consumer-config",
+      "Consumer config to consume from a source cluster. " +
+      "You may specify multiple of these.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    val producerConfigOpt = parser.accepts("producer-config",
+      "Embedded producer config.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+    
+    val numStreamsOpt = parser.accepts("num-streams",
+      "Number of consumption streams.")
+      .withRequiredArg()
+      .describedAs("Number of threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+    
+    val whitelistOpt = parser.accepts("whitelist",
+      "Whitelist of topics to mirror.")
+      .withRequiredArg()
+      .describedAs("Java regex (String)")
+      .ofType(classOf[String])
+
+    val blacklistOpt = parser.accepts("blacklist",
+            "Blacklist of topics to mirror.")
+            .withRequiredArg()
+            .describedAs("Java regex (String)")
+            .ofType(classOf[String])
+
+    val helpOpt = parser.accepts("help", "Print this message.")
+
+    val options = parser.parse(args : _*)
+
+    if (options.has(helpOpt)) {
+      parser.printHelpOn(System.out)
+      System.exit(0)
+    }
+
+    Utils.checkRequiredArgs(
+      parser, options, consumerConfigOpt, producerConfigOpt)
+    if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
+      println("Exactly one of whitelist or blacklist is required.")
+      System.exit(1)
+    }
+
+    val numStreams = options.valueOf(numStreamsOpt)
+
+    val producer = {
+      val config = new ProducerConfig(
+        Utils.loadProps(options.valueOf(producerConfigOpt)))
+      new Producer[Null, Message](config)
+    }
+
+    val threads = {
+      val connectors = options.valuesOf(consumerConfigOpt).toList
+              .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
+              .map(new ZookeeperConsumerConnector(_))
+
+      Runtime.getRuntime.addShutdownHook(new Thread() {
+        override def run() {
+          connectors.foreach(_.shutdown())
+          producer.close()
+        }
+      })
+
+      val filterSpec = if (options.has(whitelistOpt))
+        new Whitelist(options.valueOf(whitelistOpt))
+      else
+        new Blacklist(options.valueOf(blacklistOpt))
+
+      val streams =
+        connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
+
+      streams.flatten.zipWithIndex.map(streamAndIndex => {
+        new MirrorMakerThread(streamAndIndex._1, producer, streamAndIndex._2)
+      })
+    }
+
+    threads.foreach(_.start())
+
+    threads.foreach(_.awaitShutdown())
+  }
+
+  class MirrorMakerThread(stream: KafkaStream[Message],
+                          producer: Producer[Null, Message],
+                          threadId: Int)
+          extends Thread with Logging {
+
+    private val shutdownLatch = new CountDownLatch(1)
+    private val threadName = "mirrormaker-" + threadId
+
+    this.setName(threadName)
+
+    override def run() {
+      try {
+        for (msgAndMetadata <- stream) {
+          val pd = new ProducerData[Null, Message](
+            msgAndMetadata.topic, msgAndMetadata.message)
+          producer.send(pd)
+        }
+      }
+      catch {
+        case e =>
+          fatal("%s stream unexpectedly exited.", e)
+      }
+      finally {
+        shutdownLatch.countDown()
+        info("Stopped thread %s.".format(threadName))
+      }
+    }
+
+    def awaitShutdown() {
+      try {
+        shutdownLatch.await()
+      }
+      catch {
+        case e: InterruptedException => fatal(
+          "Shutdown of thread %s interrupted. This might leak data!"
+                  .format(threadName))
+      }
+    }
+  }
+}
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Sat Apr  7 00:04:51 2012
@@ -34,8 +34,6 @@ object ReplayLogProducer extends Logging
   private val GROUPID: String = "replay-log-producer"
 
   def main(args: Array[String]) {
-    var isNoPrint = false;
-
     val config = new Config(args)
 
     val executor = Executors.newFixedThreadPool(config.numThreads)
@@ -153,7 +151,7 @@ object ReplayLogProducer extends Logging
     }
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread with Logging {
+  class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
@@ -184,9 +182,9 @@ object ReplayLogProducer extends Logging
             stream.slice(0, config.numMessages)
           else
             stream
-        for (message <- iter) {
+        for (messageAndMetadata <- iter) {
           try {
-            producer.send(new ProducerData[Message, Message](config.outputTopic, message))
+            producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message))
             if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
               Thread.sleep(config.delayedMSBtwSend)
             messageCount += 1

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala Sat Apr  7 00:04:51 2012
@@ -23,72 +23,76 @@ trait Logging {
   val loggerName = this.getClass.getName
   lazy val logger = Logger.getLogger(loggerName)
 
+  protected var logIdent = ""
+  
+  private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
+
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())
-      logger.trace(msg)	
+      logger.trace(msgWithLogIdent(msg))
   }
   def trace(e: => Throwable): Any = {
     if (logger.isTraceEnabled())
-      logger.trace("",e)	
+      logger.trace(logIdent,e)
   }
   def trace(msg: => String, e: => Throwable) = {
     if (logger.isTraceEnabled())
-      logger.trace(msg,e)
+      logger.trace(msgWithLogIdent(msg),e)
   }
 
   def debug(msg: => String): Unit = {
     if (logger.isDebugEnabled())
-      logger.debug(msg)
+      logger.debug(msgWithLogIdent(msg))
   }
   def debug(e: => Throwable): Any = {
     if (logger.isDebugEnabled())
-      logger.debug("",e)	
+      logger.debug(logIdent,e)
   }
   def debug(msg: => String, e: => Throwable) = {
     if (logger.isDebugEnabled())
-      logger.debug(msg,e)
+      logger.debug(msgWithLogIdent(msg),e)
   }
 
   def info(msg: => String): Unit = {
     if (logger.isInfoEnabled())
-      logger.info(msg)
+      logger.info(msgWithLogIdent(msg))
   }
   def info(e: => Throwable): Any = {
     if (logger.isInfoEnabled())
-      logger.info("",e)
+      logger.info(logIdent,e)
   }
   def info(msg: => String,e: => Throwable) = {
     if (logger.isInfoEnabled())
-      logger.info(msg,e)
+      logger.info(msgWithLogIdent(msg),e)
   }
 
   def warn(msg: => String): Unit = {
-    logger.warn(msg)
+    logger.warn(msgWithLogIdent(msg))
   }
   def warn(e: => Throwable): Any = {
-    logger.warn("",e)
+    logger.warn(logIdent,e)
   }
   def warn(msg: => String, e: => Throwable) = {
-    logger.warn(msg,e)
+    logger.warn(msgWithLogIdent(msg),e)
   }	
 
   def error(msg: => String): Unit = {
-    logger.error(msg)
+    logger.error(msgWithLogIdent(msg))
   }		
   def error(e: => Throwable): Any = {
-    logger.error("",e)
+    logger.error(logIdent,e)
   }
   def error(msg: => String, e: => Throwable) = {
-    logger.error(msg,e)
+    logger.error(msgWithLogIdent(msg),e)
   }
 
   def fatal(msg: => String): Unit = {
-    logger.fatal(msg)
+    logger.fatal(msgWithLogIdent(msg))
   }
   def fatal(e: => Throwable): Any = {
-    logger.fatal("",e)
+    logger.fatal(logIdent,e)
   }	
   def fatal(msg: => String, e: => Throwable) = {
-    logger.fatal(msg,e)
+    logger.fatal(msgWithLogIdent(msg),e)
   }
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Sat Apr  7 00:04:51 2012
@@ -29,12 +29,13 @@ import scala.collection._
 import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
+import joptsimple.{OptionSpec, OptionSet, OptionParser}
+
 
 /**
  * Helper functions!
  */
 object Utils extends Logging {
-  
   /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function
@@ -657,6 +658,16 @@ object Utils extends Logging {
       case _ => // swallow
     }
   }
+
+  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+    for(arg <- required) {
+      if(!options.has(arg)) {
+        error("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Apr  7 00:04:51 2012
@@ -243,17 +243,11 @@ object ZkUtils extends Logging {
     getChildren(zkClient, dirs.consumerRegistryDir)
   }
 
-  def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = {
-    val dirs = new ZKGroupDirs(group)
-    val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
-    TopicCount.constructTopicCount(consumerId, topicCountJson)
-  }
-
   def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
     val dirs = new ZKGroupDirs(group)
     val consumersInGroup = getConsumersInGroup(zkClient, group)
     val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
-      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)))
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
     consumersInGroup.zip(topicCountMaps).toMap
   }
 
@@ -262,8 +256,8 @@ object ZkUtils extends Logging {
     val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
     val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
     for (consumer <- consumers) {
-      val topicCount = getTopicCount(zkClient, group, consumer)
-      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+      val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient)
+      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
         for (consumerThreadId <- consumerThreadIdSet)
           consumersPerTopicMap.get(topic) match {
             case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Sat Apr  7 00:04:51 2012
@@ -56,13 +56,13 @@ object TestZKConsumerOffsets {
   }
 }
 
-private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread {
+private class ConsumerThread(stream: KafkaStream[Message]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
     println("Starting consumer thread..")
-    for (message <- stream) {
-      println("consumed: " + Utils.toString(message.payload, "UTF-8"))
+    for (messageAndMetadata <- stream) {
+      println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8"))
     }
     shutdownLatch.countDown
     println("thread shutdown !" )

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala Sat Apr  7 00:04:51 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+
+class TopicFilterTest extends JUnitSuite {
+
+  @Test
+  def testWhitelists() {
+
+    val topicFilter1 = new Whitelist("white1,white2")
+    assertFalse(topicFilter1.requiresTopicEventWatcher)
+    assertTrue(topicFilter1.isTopicAllowed("white2"))
+    assertFalse(topicFilter1.isTopicAllowed("black1"))
+
+    val topicFilter2 = new Whitelist(".+")
+    assertTrue(topicFilter2.requiresTopicEventWatcher)
+    assertTrue(topicFilter2.isTopicAllowed("alltopics"))
+    
+    val topicFilter3 = new Whitelist("white_listed-topic.+")
+    assertTrue(topicFilter3.requiresTopicEventWatcher)
+    assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1"))
+    assertFalse(topicFilter3.isTopicAllowed("black1"))
+  }
+
+  @Test
+  def testBlacklists() {
+    val topicFilter1 = new Blacklist("black1")
+    assertTrue(topicFilter1.requiresTopicEventWatcher)
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Sat Apr  7 00:04:51 2012
@@ -234,7 +234,7 @@ class ZookeeperConsumerConnectorTest ext
         val iterator = messageStream.iterator
         for (i <- 0 until nMessages * 2) {
           assertTrue(iterator.hasNext())
-          val message = iterator.next()
+          val message = iterator.next().message
           receivedMessages ::= message
           debug("received message: " + message)
         }
@@ -270,14 +270,14 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= {
     var messages: List[Message] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator
         for (i <- 0 until nMessagesPerThread) {
           assertTrue(iterator.hasNext)
-          val message = iterator.next
+          val message = iterator.next.message
           messages ::= message
           debug("received message: " + Utils.toString(message.payload, "UTF-8"))
         }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Sat Apr  7 00:04:51 2012
@@ -56,7 +56,7 @@ class FetcherTest extends JUnit3Suite wi
     super.setUp
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
     fetcher.stopConnectionsToAllBrokers
-    fetcher.startConnections(topicInfos, cluster, null)
+    fetcher.startConnections(topicInfos, cluster)
   }
 
   override def tearDown() {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Sat Apr  7 00:04:51 2012
@@ -26,9 +26,10 @@ import kafka.utils.{TestZKUtils, TestUti
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
 import org.apache.log4j.{Level, Logger}
 import kafka.message.{NoCompressionCodec, CompressionCodec, Message}
+import kafka.consumer.{KafkaStream, ConsumerConfig}
+
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
 
@@ -91,7 +92,7 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]])
+  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]])
   : List[Message]= {
     var messages: List[Message] = Nil
     val topicMessageStreams = asMap(jTopicMessageStreams)
@@ -100,7 +101,7 @@ class ZookeeperConsumerConnectorTest ext
         val iterator = messageStream.iterator
         for (i <- 0 until nMessagesPerThread) {
           assertTrue(iterator.hasNext)
-          val message = iterator.next
+          val message = iterator.next.message
           messages ::= message
           debug("received message: " + Utils.toString(message.payload, "UTF-8"))
         }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Sat Apr  7 00:04:51 2012
@@ -21,6 +21,7 @@ import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
+
 class UtilsTest extends JUnitSuite {
   
   private val logger = Logger.getLogger(classOf[UtilsTest]) 
@@ -29,5 +30,5 @@ class UtilsTest extends JUnitSuite {
   def testSwallow() {
     Utils.swallow(logger.info, throw new IllegalStateException("test"))
   }
-  
+
 }

Modified: incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java Sat Apr  7 00:04:51 2012
@@ -16,16 +16,17 @@
  */
 package kafka.examples;
 
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaMessageStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.Message;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.Message;
+
 
 public class Consumer extends Thread
 {
@@ -55,10 +56,10 @@ public class Consumer extends Thread
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(1));
-    Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
+    Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaStream<Message> stream =  consumerMap.get(topic).get(0);
     ConsumerIterator<Message> it = stream.iterator();
     while(it.hasNext())
-      System.out.println(ExampleUtils.getMessage(it.next()));
+      System.out.println(ExampleUtils.getMessage(it.next().message()));
   }
 }

Modified: incubator/kafka/trunk/examples/src/main/java/kafka/examples/ExampleUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/examples/src/main/java/kafka/examples/ExampleUtils.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/examples/src/main/java/kafka/examples/ExampleUtils.java (original)
+++ incubator/kafka/trunk/examples/src/main/java/kafka/examples/ExampleUtils.java Sat Apr  7 00:04:51 2012
@@ -16,8 +16,8 @@
  */
 package kafka.examples;
 
-import java.nio.ByteBuffer;
 
+import java.nio.ByteBuffer;
 import kafka.message.Message;
 
 public class ExampleUtils

Modified: incubator/kafka/trunk/examples/src/main/java/kafka/examples/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/examples/src/main/java/kafka/examples/Producer.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/examples/src/main/java/kafka/examples/Producer.java (original)
+++ incubator/kafka/trunk/examples/src/main/java/kafka/examples/Producer.java Sat Apr  7 00:04:51 2012
@@ -16,9 +16,10 @@
  */
 package kafka.examples;
 
+
+import java.util.Properties;
 import kafka.javaapi.producer.ProducerData;
 import kafka.producer.ProducerConfig;
-import java.util.Properties;
 
 public class Producer extends Thread
 {

Modified: incubator/kafka/trunk/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (original)
+++ incubator/kafka/trunk/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java Sat Apr  7 00:04:51 2012
@@ -16,17 +16,14 @@
  */
 package kafka.examples;
 
+
 import java.util.ArrayList;
 import java.util.List;
-
+import kafka.api.FetchRequest;
 import kafka.javaapi.MultiFetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
-import scala.collection.Iterator;
-
-import kafka.api.FetchRequest;
-import kafka.message.Message;
 
 
 public class SimpleConsumerDemo

Modified: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (original)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala Sat Apr  7 00:04:51 2012
@@ -17,15 +17,12 @@
 
 package kafka.perf
 
-import java.net.URI
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
-import joptsimple._
 import org.apache.log4j.Logger
 import kafka.message.Message
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, Utils}
+import kafka.utils.Utils
 import java.util.{Random, Properties}
 import kafka.consumer._
 import java.text.SimpleDateFormat
@@ -139,7 +136,7 @@ object ConsumerPerformance {
     val hideHeader = options.has(hideHeaderOpt)
   }
 
-  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaMessageStream[Message],
+  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
                            config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
     extends Thread(name) {
     private val shutdownLatch = new CountDownLatch(1)
@@ -157,9 +154,9 @@ object ConsumerPerformance {
       var lastMessagesRead = 0L
 
       try {
-        for (message <- stream if messagesRead < config.numMessages) {
+        for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
           messagesRead += 1
-          bytesRead += message.payloadSize
+          bytesRead += messageAndMetadata.message.payloadSize
 
           if (messagesRead % config.reportingInterval == 0) {
             if(config.showDetailedStats)

Modified: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala (original)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala Sat Apr  7 00:04:51 2012
@@ -18,7 +18,7 @@
 package kafka.perf
 
 import joptsimple.OptionParser
-import java.text.SimpleDateFormat
+
 
 class PerfConfig(args: Array[String]) {
   val parser = new OptionParser

Modified: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (original)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala Sat Apr  7 00:04:51 2012
@@ -18,9 +18,7 @@
 package kafka.perf
 
 import java.net.URI
-import joptsimple._
 import kafka.utils._
-import kafka.server._
 import kafka.consumer.SimpleConsumer
 import org.apache.log4j.Logger
 import kafka.api.{OffsetRequest, FetchRequest}

Added: incubator/kafka/trunk/system_test/mirror_maker/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/README?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/README (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/README Sat Apr  7 00:04:51 2012
@@ -0,0 +1,22 @@
+This test replicates messages from two source kafka clusters into one target
+kafka cluster using the mirror-maker tool.  At the end, the messages produced
+at the source brokers should match that at the target brokers.
+
+To run this test, do
+bin/run-test.sh
+
+In the event of failure, by default the brokers and zookeepers remain running
+to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
+change this behavior by setting the action_on_fail flag in the script to "exit"
+or "proceed", in which case a snapshot of all the logs and directories is
+placed in the test's base directory.
+
+It is a good idea to run the test in a loop. E.g.:
+
+:>/tmp/mirrormaker_test.log
+for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/mirrormaker_test.log; done
+tail -F /tmp/mirrormaker_test.log
+
+grep -ic passed /tmp/mirrormaker_test.log
+grep -ic failed /tmp/mirrormaker_test.log
+

Added: incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out Sat Apr  7 00:04:51 2012
@@ -0,0 +1,18 @@
+start the servers ...
+start producing messages ...
+wait for consumer to finish consuming ...
+[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
+thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
+[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
+Messages/sec: 17444.3960
+MB/sec: 3.3214
+test passed
+stopping the servers
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

Added: incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh Sat Apr  7 00:04:51 2012
@@ -0,0 +1,357 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+readonly num_messages=10000
+readonly message_size=100
+readonly action_on_fail="proceed"
+# readonly action_on_fail="wait"
+
+readonly test_start_time="$(date +%s)"
+
+readonly base_dir=$(dirname $0)/..
+
+info() {
+    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+kill_child_processes() {
+    isTopmost=$1
+    curPid=$2
+    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+    for childPid in $childPids
+    do
+        kill_child_processes 0 $childPid
+    done
+    if [ $isTopmost -eq 0 ]; then
+        kill -15 $curPid 2> /dev/null
+    fi
+}
+
+cleanup() {
+    info "cleaning up"
+
+    pid_zk_source1=
+    pid_zk_source2=
+    pid_zk_target=
+    pid_kafka_source_1_1=
+    pid_kafka_source_1_2=
+    pid_kafka_source_2_1=
+    pid_kafka_source_2_2=
+    pid_kafka_target_1_1=
+    pid_kafka_target_1_2=
+    pid_producer=
+    pid_mirrormaker_1=
+    pid_mirrormaker_2=
+
+    rm -rf /tmp/zookeeper*
+
+    rm -rf /tmp/kafka*
+}
+
+begin_timer() {
+    t_begin=$(date +%s)
+}
+
+end_timer() {
+    t_end=$(date +%s)
+}
+
+start_zk() {
+    info "starting zookeepers"
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_1.properties 2>&1 > $base_dir/zookeeper_source-1.log &
+    pid_zk_source1=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_2.properties 2>&1 > $base_dir/zookeeper_source-2.log &
+    pid_zk_source2=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+    pid_zk_target=$!
+}
+
+start_source_servers() {
+    info "starting source cluster"
+
+    JMX_PORT=1111 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_1.properties 2>&1 > $base_dir/kafka_source-1-1.log &
+    pid_kafka_source_1_1=$!
+    JMX_PORT=2222 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_2.properties 2>&1 > $base_dir/kafka_source-1-2.log &
+    pid_kafka_source_1_2=$!
+    JMX_PORT=3333 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_1.properties 2>&1 > $base_dir/kafka_source-2-1.log &
+    pid_kafka_source_2_1=$!
+    JMX_PORT=4444 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_2.properties 2>&1 > $base_dir/kafka_source-2-2.log &
+    pid_kafka_source_2_2=$!
+}
+
+start_target_servers() {
+    info "starting mirror cluster"
+    JMX_PORT=5555 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_1.properties 2>&1 > $base_dir/kafka_target-1-1.log &
+    pid_kafka_target_1_1=$!
+    JMX_PORT=6666 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_2.properties 2>&1 > $base_dir/kafka_target-1-2.log &
+    pid_kafka_target_1_2=$!
+}
+
+shutdown_servers() {
+    info "stopping mirror-maker"
+    if [ "x${pid_mirrormaker_1}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_1}; fi
+    # sleep to avoid rebalancing during shutdown
+    sleep 2
+    if [ "x${pid_mirrormaker_2}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_2}; fi
+
+    info "stopping producer"
+    if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
+
+    info "shutting down target servers"
+    if [ "x${pid_kafka_target_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_1}; fi
+    if [ "x${pid_kafka_target_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_2}; fi
+    sleep 2
+
+    info "shutting down source servers"
+    if [ "x${pid_kafka_source_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_1}; fi
+    if [ "x${pid_kafka_source_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_2}; fi
+    if [ "x${pid_kafka_source_2_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_1}; fi
+    if [ "x${pid_kafka_source_2_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_2}; fi
+
+    info "shutting down zookeeper servers"
+    if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
+    if [ "x${pid_zk_source1}" != "x" ]; then kill_child_processes 0 ${pid_zk_source1}; fi
+    if [ "x${pid_zk_source2}" != "x" ]; then kill_child_processes 0 ${pid_zk_source2}; fi
+}
+
+start_producer() {
+    topic=$1
+    zk=$2
+    info "start producing messages for topic $topic to zookeeper $zk ..."
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
+    pid_producer=$!
+}
+
+# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
+wait_partition_done() {
+    n_tuples=$(($# / 3))
+
+    i=1
+    while (($#)); do
+        kafka_server[i]=$1
+        topic[i]=$2
+        partitionid[i]=$3
+        prev_offset[i]=0
+        info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
+        i=$((i+1))
+        shift 3
+    done
+
+    all_done=0
+
+    # set -x
+    while [[ $all_done != 1 ]]; do
+        sleep 4
+        i=$n_tuples
+        all_done=1
+        for ((i=1; i <= $n_tuples; i++)); do
+            cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
+            if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
+                all_done=0
+                prev_offset[i]=$cur_size
+            fi
+        done
+    done
+
+}
+
+cmp_logs() {
+    topic=$1
+    info "comparing source and target logs for topic $topic"
+    source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part3_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9095 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    if [ "x$source_part0_size" == "x" ]; then source_part0_size=0; fi
+    if [ "x$source_part1_size" == "x" ]; then source_part1_size=0; fi
+    if [ "x$source_part2_size" == "x" ]; then source_part2_size=0; fi
+    if [ "x$source_part3_size" == "x" ]; then source_part3_size=0; fi
+    if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
+    if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
+    expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size + $source_part3_size))
+    actual_size=$(($target_part0_size + $target_part1_size))
+    if [ "x$expected_size" != "x$actual_size" ]
+    then
+        info "source size: $expected_size target size: $actual_size"
+        return 1
+    else
+        return 0
+    fi
+}
+
+take_fail_snapshot() {
+    snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
+    mkdir $snapshot_dir
+    for dir in /tmp/zookeeper_source{1..2} /tmp/zookeeper_target /tmp/kafka-source-{1..2}-{1..2}-logs /tmp/kafka-target{1..2}-logs; do
+        if [ -d $dir ]; then
+            cp -r $dir $snapshot_dir
+        fi
+    done
+}
+
+# Usage: process_test_result <result> <action_on_fail>
+# result: last test result
+# action_on_fail: (exit|wait|proceed)
+# ("wait" is useful if you want to troubleshoot using zookeeper)
+process_test_result() {
+    result=$1
+    if [ $1 -eq 0 ]; then
+        info "test passed"
+    else
+        info "test failed"
+        case "$2" in
+            "wait") info "waiting: hit Ctrl-c to quit"
+                wait
+                ;;
+            "exit") shutdown_servers
+                take_fail_snapshot
+                exit $result
+                ;;
+            *) shutdown_servers
+                take_fail_snapshot
+                info "proceeding"
+                ;;
+        esac
+    fi
+}
+
+test_whitelists() {
+    info "### Testing whitelists"
+    snapshot_prefix="whitelist-test"
+
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers
+    sleep 4
+
+    info "starting mirror makers"
+    JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+    pid_mirrormaker_1=$!
+    JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
+    pid_mirrormaker_2=$!
+
+    begin_timer
+
+    start_producer whitetopic01 localhost:2181
+    start_producer whitetopic01 localhost:2182
+    info "waiting for whitetopic01 producers to finish producing ..."
+    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 kafka://localhost:9092 whitetopic01 0 kafka://localhost:9093 whitetopic01 0
+
+    start_producer whitetopic02 localhost:2181
+    start_producer whitetopic03 localhost:2181
+    start_producer whitetopic04 localhost:2182
+    info "waiting for whitetopic02,whitetopic03,whitetopic04 producers to finish producing ..."
+    wait_partition_done kafka://localhost:9090 whitetopic02 0 kafka://localhost:9091 whitetopic02 0 kafka://localhost:9090 whitetopic03 0 kafka://localhost:9091 whitetopic03 0 kafka://localhost:9092 whitetopic04 0 kafka://localhost:9093 whitetopic04 0
+
+    start_producer blacktopic01 localhost:2182
+    info "waiting for blacktopic01 producer to finish producing ..."
+    wait_partition_done kafka://localhost:9092 blacktopic01 0 kafka://localhost:9093 blacktopic01 0
+
+    info "waiting for consumer to finish consuming ..."
+
+    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 kafka://localhost:9094 whitetopic02 0 kafka://localhost:9095 whitetopic02 0 kafka://localhost:9094 whitetopic03 0 kafka://localhost:9095 whitetopic03 0 kafka://localhost:9094 whitetopic04 0 kafka://localhost:9095 whitetopic04 0
+
+    end_timer
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    # if [[ -d /tmp/kafka-target-1-1-logs/blacktopic01 || /tmp/kafka-target-1-2-logs/blacktopic01 ]]; then
+    #     echo "blacktopic01 found on target cluster"
+    #     result=1
+    # else
+    #     cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
+    #     result=$?
+    # fi
+
+    cmp_logs blacktopic01
+
+    cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
+    result=$?
+
+    return $result
+}
+
+test_blacklists() {
+    info "### Testing blacklists"
+    snapshot_prefix="blacklist-test"
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers
+    sleep 4
+
+    info "starting mirror maker"
+    $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+    pid_mirrormaker_1=$!
+
+    start_producer blacktopic01 localhost:2181
+    start_producer blacktopic02 localhost:2181
+    info "waiting for producer to finish producing blacktopic01,blacktopic02 ..."
+    wait_partition_done kafka://localhost:9090 blacktopic01 0 kafka://localhost:9091 blacktopic01 0 kafka://localhost:9090 blacktopic02 0 kafka://localhost:9091 blacktopic02 0
+
+    begin_timer
+
+    start_producer whitetopic01 localhost:2181
+    info "waiting for producer to finish producing whitetopic01 ..."
+    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0
+
+    info "waiting for consumer to finish consuming ..."
+    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0
+
+    end_timer
+
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    cmp_logs blacktopic01 || cmp_logs blacktopic02
+    if [ $? -eq 0 ]; then
+        return 1
+    fi
+    
+    cmp_logs whitetopic01
+    return $?
+}
+
+# main test begins
+
+echo "Test-$test_start_time"
+
+# Ctrl-c trap. Catches INT signal
+trap "shutdown_servers; exit 0" INT
+
+test_whitelists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+ 
+sleep 2
+ 
+test_blacklists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+
+exit $result
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+shallowiterator.enable=true
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+# broker.list=1:localhost:9094,2:localhost:9095
+
+# timeout in ms for connecting to zookeeper
+# zk.connectiontimeout.ms=1000000
+
+producer.type=async
+
+# to avoid dropping events if the queue is full, wait indefinitely
+queue.enqueueTimeout.ms=-1
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9090
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-1-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=10000000
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9091
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-1-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-2-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9093
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-2-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9094
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target-1-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9095
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target-1-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+shallowiterator.enable=true
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+shallowiterator.enable=true
+

Added: incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source-1
+# the port at which the clients will connect
+clientPort=2181

Added: incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source-2
+# the port at which the clients will connect
+clientPort=2182

Added: incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties (added)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties Sat Apr  7 00:04:51 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_target
+# the port at which the clients will connect
+clientPort=2183



Mime
View raw message