Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,149 @@
+package kafka.perf;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+import kafka.message.NoCompressionCodec;
+import kafka.perf.jmx.BrokerJmxClient;
+
+public class PerfTimer extends Thread
+{
+ private final long timeToRun;
+ private final BrokerJmxClient brokerStats;
+ private final KafkaPerfSimulator perfSim;
+ private final int numConsumers, numProducer,numParts, numTopic;
+ private final String reportFile;
+ private final int compression;
+
+ public PerfTimer(BrokerJmxClient brokerStats,
+ KafkaPerfSimulator perfSim, int numConsumers,
+ int numProducer, int numParts, int numTopic,
+ long timeToRun,
+ String fileName, int compression)
+ {
+ super("PerfTimer");
+ this.timeToRun = timeToRun;
+ this.brokerStats = brokerStats;
+ this.perfSim = perfSim;
+ this.numConsumers = numConsumers;
+ this.numProducer = numProducer;
+ this.numParts = numParts;
+ this.numTopic = numTopic;
+ this.compression = compression;
+ reportFile = fileName;
+ }
+
+
+ protected File openReportFile(String name) throws Exception
+ {
+ File file = new File(name);
+ if (!file.exists()) {
+ if (file.getParentFile() != null) {
+ file.getParentFile().mkdirs();
+ }
+ }
+ return file;
+ }
+
+
+ public void printMBDataStats() throws Exception
+ {
+ File mbDataFile = openReportFile(reportFile + "/MBdata.csv");
+ boolean witeHeader = !mbDataFile.exists();
+ FileWriter fstream = new FileWriter(mbDataFile, true);
+ BufferedWriter writer = new BufferedWriter(fstream);
+ if(witeHeader)
+ writer.write(perfSim.getXaxisLabel() + ",consumer-MB/sec,total-consumer-MB/sec,producer-MB/sec, total-producer-MB/sec\n");
+ writer.write(perfSim.getXAxisVal() + "," + perfSim.getAvgMBytesRecPs() + "," + (numConsumers *perfSim.getAvgMBytesRecPs()) +
+ "," + perfSim.getAvgMBytesSentPs() + "," + (perfSim.getAvgMBytesSentPs() * numProducer ));
+
+ writer.newLine();
+ writer.close();
+ fstream.close();
+ }
+
+
+ public void printMessageDataStats() throws Exception
+ {
+ File file = openReportFile(reportFile + "/NumMessage.csv");
+ boolean witeHeader = !file.exists();
+ FileWriter fstream = new FileWriter(file, true);
+ BufferedWriter writer = new BufferedWriter(fstream);
+ if(witeHeader)
+ writer.write(perfSim.getXaxisLabel() + ",consumer-messages/sec,total-consumer-messages/sec,producer-messages/sec, total-producer-messages/sec\n");
+ writer.write(perfSim.getXAxisVal() + "," + perfSim.getAvgMessagesRecPs() + "," + (numConsumers *perfSim.getAvgMessagesRecPs()) +
+ "," + perfSim.getAvgMessagesSentPs() + "," + (perfSim.getAvgMessagesSentPs() * numProducer) );
+
+ writer.newLine();
+ writer.close();
+ fstream.close();
+ }
+
+
+ public void printReport() throws Exception
+ {
+ String header = "#consumers, #of producers, #of partitions, #of topic, " +
+ "consumer mess/sec,consumer MB/sec, producer mess/sec,producer MB/sec, broker MB write/sec, broker MB read/sec";
+ String data = numConsumers+ "," + numProducer + "," + numParts+ "," + numTopic + "," +
+ perfSim.getAvgMessagesRecPs() + "," +
+ perfSim.getAvgMBytesRecPs() + "," +
+ perfSim.getAvgMessagesSentPs() + "," +
+ perfSim.getAvgMBytesSentPs() + "," +
+ brokerStats.getBrokerStats();
+
+ System.out.println(header);
+ System.out.println(data);
+ printMessageDataStats();
+ printMBDataStats();
+ if(compression != NoCompressionCodec.codec())
+ printCompressionRatio();
+ }
+
+ public void printCompressionRatio() throws Exception
+ {
+ if(perfSim.getKafkaServersURL().equals("localhost") && (perfSim.getKafkaServerLogDir() != null)) {
+ File logDir = new File(perfSim.getKafkaServerLogDir());
+ long totalLogLength = 0L;
+ if(logDir.isDirectory()) {
+ File[] files = logDir.listFiles();
+ for(int i = 0; i < files.length; i++)
+ totalLogLength += files[i].length();
+ }else
+ totalLogLength += logDir.length();
+ System.out.println("Log length = " + totalLogLength);
+ File file = new File(reportFile + "/CompressionRatio.csv");
+ boolean writeHeader = !file.exists();
+ FileWriter fstream = new FileWriter(file, true);
+ BufferedWriter writer = new BufferedWriter(fstream);
+ if(writeHeader)
+ writer.write(perfSim.getXaxisLabel() + "Compression Ratio\n");
+ writer.write(perfSim.getXAxisVal() + "," + (perfSim.getTotalBytesSent()/(double)totalLogLength));
+
+ writer.newLine();
+ writer.close();
+ fstream.close();
+ }
+ }
+
+ public void run() {
+ try
+ {
+ Thread.sleep(timeToRun);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ try
+ {
+ printReport();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ System.exit(0);
+ }
+}
Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf.consumer;
+
+import java.lang.Thread;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import kafka.api.FetchRequest;
+import kafka.javaapi.MultiFetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
+public class SimplePerfConsumer extends Thread
+{
+ private SimpleConsumer simpleConsumer;
+ private String topic;
+ private String consumerName;
+ private int fetchSize;
+ private AtomicLong bytesRec;
+ private AtomicLong messagesRec;
+ private AtomicLong lastReportMessageRec;
+ private AtomicLong lastReportBytesRec;
+ private long offset = 0;
+ private final int numParts;
+
+ public SimplePerfConsumer(String topic, String kafkaServerURL, int kafkaServerPort,
+ int kafkaProducerBufferSize, int connectionTimeOut, int reconnectInterval,
+ int fetchSize, String name, int numParts)
+ {
+ super(name);
+ simpleConsumer = new SimpleConsumer(kafkaServerURL,
+ kafkaServerPort,
+ connectionTimeOut,
+ kafkaProducerBufferSize);
+ this.topic = topic;
+ this.fetchSize = fetchSize;
+ consumerName = name;
+ bytesRec = new AtomicLong(0L);
+ messagesRec = new AtomicLong(0L);
+ lastReportMessageRec = new AtomicLong(System.currentTimeMillis());
+ lastReportBytesRec = new AtomicLong(System.currentTimeMillis());
+ this.numParts = numParts;
+ }
+
+ public void run() {
+ while(true)
+ {
+ List<FetchRequest> list = new ArrayList<FetchRequest>();
+ for(int i=0 ; i < numParts; i++)
+ {
+ FetchRequest req = new FetchRequest(topic, i, offset, fetchSize);
+ list.add(req);
+ }
+
+
+ MultiFetchResponse response = simpleConsumer.multifetch(list);
+ for (ByteBufferMessageSet messages: response)
+ {
+ offset+= messages.validBytes();
+ bytesRec.getAndAdd(messages.sizeInBytes());
+
+ Iterator<MessageAndOffset> it = messages.iterator();
+ while(it.hasNext())
+ {
+ it.next();
+ messagesRec.getAndIncrement();
+ }
+ }
+ }
+ }
+
+ public double getMessagesRecPs()
+ {
+ double val = (double)messagesRec.get() / (System.currentTimeMillis() - lastReportMessageRec.get());
+ return val * 1000;
+ }
+
+ public String getConsumerName()
+ {
+ return consumerName;
+ }
+
+ public double getMBytesRecPs()
+ {
+ double val = ((double)bytesRec.get() / (System.currentTimeMillis() - lastReportBytesRec.get())) / (1024*1024);
+ return val * 1000;
+ }
+
+}
Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,49 @@
+package kafka.perf.jmx;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import kafka.network.SocketServerStatsMBean;
+
+public class BrokerJmxClient
+{
+ private final String host;
+ private final int port;
+ private final long time;
+ public BrokerJmxClient(String host, int port,
+ long time)
+ {
+ this.host = host;
+ this.port = port;
+ this.time = time;
+ }
+
+ public MBeanServerConnection getMbeanConnection() throws Exception
+ {
+ JMXServiceURL url =
+ new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+ host+ ":" + port + "/jmxrmi");
+ JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ return mbsc;
+ }
+
+ public SocketServerStatsMBean createSocketMbean() throws Exception
+ {
+
+ ObjectName mbeanName = new ObjectName("kafka:type=kafka.SocketServerStats");
+ SocketServerStatsMBean stats = JMX.newMBeanProxy(getMbeanConnection(), mbeanName, SocketServerStatsMBean.class, true);
+ return stats;
+ }
+
+ public String getBrokerStats() throws Exception
+ {
+ StringBuffer buf = new StringBuffer();
+ SocketServerStatsMBean stats = createSocketMbean();
+ buf.append(stats.getBytesWrittenPerSecond() / (1024 *1024) + "," + stats.getBytesReadPerSecond() / (1024 *1024) );
+ return buf.toString();
+ }
+}
Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf.producer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.SyncProducerConfig;
+
+public class Producer extends Thread
+{
+ private final SyncProducer producer;
+ private final String topic;
+ private final int messageSize;
+ private AtomicLong bytesSent = new AtomicLong(0L);
+ private AtomicLong messagesSent = new AtomicLong(0L);
+ private AtomicLong lastReportMessageSent = new AtomicLong(System.currentTimeMillis());
+ private AtomicLong lastReportBytesSent = new AtomicLong(System.currentTimeMillis());
+ private String producerName;
+ private int batchSize;
+ private int numParts;
+ private final int compression;
+
+ public Producer(String topic, String kafkaServerURL, int kafkaServerPort,
+ int kafkaProducerBufferSize, int connectionTimeOut, int reconnectInterval,
+ int messageSize, String name, int batchSize, int numParts, int compression)
+ {
+ super(name);
+ Properties props = new Properties();
+ props.put("host", kafkaServerURL);
+ props.put("port", String.valueOf(kafkaServerPort));
+ props.put("buffer.size", String.valueOf(kafkaProducerBufferSize));
+ props.put("connect.timeout.ms", String.valueOf(connectionTimeOut));
+ props.put("reconnect.interval", String.valueOf(reconnectInterval));
+ producer = new SyncProducer(new SyncProducerConfig(props));
+ this.topic = topic;
+
+ this.messageSize = messageSize;
+ producerName = name;
+ this.batchSize = batchSize;
+ this.numParts = numParts;
+ this.compression = compression;
+ }
+
+ public void run() {
+ Random random = new Random();
+ while(true)
+ {
+ List<Message> messageList = new ArrayList<Message>();
+ for(int i = 0; i < batchSize; i++)
+ {
+ Message message = new Message(new byte[messageSize]);
+ messageList.add(message);
+ }
+ ByteBufferMessageSet set = new ByteBufferMessageSet(kafka.message.CompressionCodec$.MODULE$.getCompressionCodec(compression), messageList);
+ producer.send(topic, random.nextInt(numParts), set);
+ bytesSent.getAndAdd(batchSize * messageSize);
+ messagesSent.getAndAdd(messageList.size());
+ }
+ }
+
+ public double getMessagesSentPs()
+ {
+ double val = (double)messagesSent.get() / (System.currentTimeMillis() - lastReportMessageSent.get());
+ return val * 1000;
+ }
+
+ public String getProducerName()
+ {
+ return producerName;
+ }
+
+ public double getMBytesSentPs()
+ {
+ double val = ((double)bytesSent.get() / (System.currentTimeMillis() - lastReportBytesSent.get())) / (1024*1024);
+ return val * 1000;
+ }
+
+ public long getTotalBytesSent() {
+ return bytesSent.get();
+ }
+}
Added: incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,31 @@
+
+REMOTE_KAFKA_HOME="~/kafka-perf"
+REMOTE_KAFKA_LOG_DIR="$REMOTE_KAFKA_HOME/tmp/kafka-logs"
+SIMULATOR_SCRIPT="$REMOTE_KAFKA_HOME/perf/run-simulator.sh"
+
+REMOTE_KAFKA_HOST=`echo $REMOTE_KAFKA_LOGIN | cut -d @ -f 2`
+REMOTE_SIM_HOST=`echo $REMOTE_SIM_LOGIN | cut -d @ -f 2`
+
+# If we are running the broker on the same box, use the local interface.
+KAFKA_SERVER=$REMOTE_KAFKA_HOST
+if [[ "$REMOTE_KAFKA_HOST" == "$REMOTE_SIM_HOST" ]];
+then
+ KAFKA_SERVER="localhost"
+fi
+
+
+# todo: some echos
+# todo: talkative sleep
+
+function kafka_startup() {
+ ssh $REMOTE_KAFKA_LOGIN "cd $REMOTE_KAFKA_HOME; ./bin/kafka-server-start.sh config/server.properties 2>&1 > kafka.out" &
+ sleep 10
+}
+
+
+function kafka_cleanup() {
+ ssh $REMOTE_KAFKA_LOGIN "cd $REMOTE_KAFKA_HOME; ./bin/kafka-server-stop.sh" &
+ sleep 10
+ ssh $REMOTE_KAFKA_LOGIN "rm -rf $REMOTE_KAFKA_LOG_DIR" &
+ sleep 10
+}
Added: incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+ kafka_startup
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10 -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=40 -xaxis=fetchSize -msgSize=1000 -fetchSize=$((1024*$i))"
+ kafka_cleanup
+done
Propchange: incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 200 `seq -s " " 1000 1000 10000` ;
+do
+ kafka_startup
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10 -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=40 -xaxis=msgSize -msgSize=$i"
+ kafka_cleanup
+done
Propchange: incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+ kafka_startup
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10 -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=$i -numProducer=10 -xaxis=numConsumer"
+ kafka_cleanup
+done
Propchange: incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+kafka_startup
+# You need to twidle this time value depending on test time below
+ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=1 -reportFile=$REPORT_FILE -time=7 -numConsumer=0 -numProducer=10 -xaxis=numConsumer"
+sleep 20
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=1 -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=$i -numProducer=0 -xaxis=numConsumer"
+ sleep 10
+done
+
+kafka_cleanup
Propchange: incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+ kafka_startup
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=1 -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=0 -numProducer=$i -xaxis=numProducer"
+ kafka_cleanup
+done
Propchange: incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+ kafka_startup
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10 -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=$i -xaxis=numProducer"
+ kafka_cleanup
+done
Propchange: incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+ kafka_startup
+ ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=$i -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=40 -xaxis=numTopic"
+ kafka_cleanup
+done
Propchange: incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/project/build.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/project/build.properties (added)
+++ incubator/kafka/trunk/project/build.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,10 @@
+#Project properties
+#Mon Feb 28 11:55:49 PST 2011
+project.name=Kafka
+sbt.version=0.7.5
+project.version=0.7
+build.scala.versions=2.8.0
+contrib.root.dir=contrib
+lib.dir=lib
+target.dir=target/scala_2.8.0
+dist.dir=dist
Added: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (added)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,190 @@
+import sbt._
+
+class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
+ lazy val core = project("core", "core-kafka", new CoreKafkaProject(_))
+ lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core)
+ lazy val perf = project("perf", "perf", new KafkaPerfProject(_), core)
+ lazy val contrib = project("contrib", "contrib", new ContribProject(_))
+
+ lazy val releaseZipTask = core.packageDistTask
+
+ val releaseZipDescription = "Compiles every sub project, runs unit tests, creates a deployable release zip file with dependencies, config, and scripts."
+ lazy val releaseZip = releaseZipTask dependsOn(core.corePackageAction, core.test, examples.examplesPackageAction, perf.perfPackageAction,
+ contrib.producerPackageAction, contrib.consumerPackageAction) describedAs releaseZipDescription
+
+ class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info)
+ with IdeaProject with CoreDependencies with TestDependencies {
+ val corePackageAction = packageAllAction
+
+ //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+ // some dependencies on various sun and javax packages.
+ override def ivyXML =
+ <dependencies>
+ <exclude module="javax"/>
+ <exclude module="jmxri"/>
+ <exclude module="jmxtools"/>
+ <exclude module="mail"/>
+ <exclude module="jms"/>
+ </dependencies>
+
+ override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.com/maven2",
+ "Oracle Maven 2 Repository" at "http://download.oracle.com/maven", "maven.org" at "http://repo2.maven.org/maven2/")
+
+ override def artifactID = "kafka"
+ override def filterScalaJars = false
+
+ // build the executable jar's classpath.
+ // (why is it necessary to explicitly remove the target/{classes,resources} paths? hm.)
+ def dependentJars = {
+ val jars =
+ publicClasspath +++ mainDependencies.scalaJars --- mainCompilePath --- mainResourcesOutputPath
+ if (jars.get.find { jar => jar.name.startsWith("scala-library-") }.isDefined) {
+ // workaround bug in sbt: if the compiler is explicitly included, don't include 2 versions
+ // of the library.
+ jars --- jars.filter { jar =>
+ jar.absolutePath.contains("/boot/") && jar.name == "scala-library.jar"
+ }
+ } else {
+ jars
+ }
+ }
+
+ def dependentJarNames = dependentJars.getFiles.map(_.getName).filter(_.endsWith(".jar"))
+ override def manifestClassPath = Some(dependentJarNames.map { "libs/" + _ }.mkString(" "))
+
+ def distName = (artifactID + "-" + projectVersion.value)
+ def distPath = "dist" / distName ##
+
+ def configPath = "config" ##
+ def configOutputPath = distPath / "config"
+
+ def binPath = "bin" ##
+ def binOutputPath = distPath / "bin"
+
+ def distZipName = {
+ "%s-%s.zip".format(artifactID, projectVersion.value)
+ }
+
+ lazy val packageDistTask = task {
+ distPath.asFile.mkdirs()
+ (distPath / "libs").asFile.mkdirs()
+ binOutputPath.asFile.mkdirs()
+ configOutputPath.asFile.mkdirs()
+
+ FileUtilities.copyFlat(List(jarPath), distPath, log).left.toOption orElse
+ FileUtilities.copyFlat(dependentJars.get, distPath / "libs", log).left.toOption orElse
+ FileUtilities.copy((configPath ***).get, configOutputPath, log).left.toOption orElse
+ FileUtilities.copy((binPath ***).get, binOutputPath, log).left.toOption orElse
+ FileUtilities.zip((("dist" / distName) ##).get, "dist" / distZipName, true, log)
+ None
+ }
+
+ val PackageDistDescription = "Creates a deployable zip file with dependencies, config, and scripts."
+ lazy val packageDist = packageDistTask dependsOn(`package`, `test`) describedAs PackageDistDescription
+
+ val cleanDist = cleanTask("dist" ##) describedAs("Erase any packaged distributions.")
+ override def cleanAction = super.cleanAction dependsOn(cleanDist)
+
+ override def javaCompileOptions = super.javaCompileOptions ++
+ List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
+ }
+
+ class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)
+ with IdeaProject
+ with CoreDependencies {
+ val examplesPackageAction = packageAllAction
+ val dependsOnCore = core
+ //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+ // some dependencies on various sun and javax packages.
+ override def ivyXML =
+ <dependencies>
+ <exclude module="javax"/>
+ <exclude module="jmxri"/>
+ <exclude module="jmxtools"/>
+ <exclude module="mail"/>
+ <exclude module="jms"/>
+ </dependencies>
+
+ override def artifactID = "kafka-java-examples"
+ override def filterScalaJars = false
+ }
+
+ class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)
+ with IdeaProject
+ with CoreDependencies {
+ val perfPackageAction = packageAllAction
+ val dependsOnCore = core
+ //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+ // some dependencies on various sun and javax packages.
+ override def ivyXML =
+ <dependencies>
+ <exclude module="javax"/>
+ <exclude module="jmxri"/>
+ <exclude module="jmxtools"/>
+ <exclude module="mail"/>
+ <exclude module="jms"/>
+ </dependencies>
+
+ override def artifactID = "kafka-perf"
+ override def filterScalaJars = false
+ }
+
+ class ContribProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
+ lazy val hadoopProducer = project("hadoop-producer", "hadoop producer",
+ new HadoopProducerProject(_), core)
+ lazy val hadoopConsumer = project("hadoop-consumer", "hadoop consumer",
+ new HadoopConsumerProject(_), core)
+
+ val producerPackageAction = hadoopProducer.producerPackageAction
+ val consumerPackageAction = hadoopConsumer.consumerPackageAction
+
+ class HadoopProducerProject(info: ProjectInfo) extends DefaultProject(info)
+ with IdeaProject
+ with CoreDependencies {
+ val producerPackageAction = packageAllAction
+ override def ivyXML =
+ <dependencies>
+ <exclude module="netty"/>
+ <exclude module="javax"/>
+ <exclude module="jmxri"/>
+ <exclude module="jmxtools"/>
+ <exclude module="mail"/>
+ <exclude module="jms"/>
+ </dependencies>
+
+ val avro = "org.apache.avro" % "avro" % "1.4.1"
+ val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5"
+ val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5"
+ }
+
+ class HadoopConsumerProject(info: ProjectInfo) extends DefaultProject(info)
+ with IdeaProject
+ with CoreDependencies {
+ val consumerPackageAction = packageAllAction
+ override def ivyXML =
+ <dependencies>
+ <exclude module="netty"/>
+ <exclude module="javax"/>
+ <exclude module="jmxri"/>
+ <exclude module="jmxtools"/>
+ <exclude module="mail"/>
+ <exclude module="jms"/>
+ </dependencies>
+
+ val jodaTime = "joda-time" % "joda-time" % "1.6"
+ val httpclient = "commons-httpclient" % "commons-httpclient" % "3.1"
+ }
+ }
+
+ trait TestDependencies {
+ val easymock = "org.easymock" % "easymock" % "3.0" % "test"
+ val junit = "junit" % "junit" % "4.1" % "test"
+ val scalaTest = "org.scalatest" % "scalatest" % "1.2" % "test"
+ }
+
+ trait CoreDependencies {
+ val log4j = "log4j" % "log4j" % "1.2.15"
+ val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+ }
+
+}
Added: incubator/kafka/trunk/project/plugins/Plugins.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/plugins/Plugins.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/project/plugins/Plugins.scala (added)
+++ incubator/kafka/trunk/project/plugins/Plugins.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,6 @@
+import sbt._
+
+class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
+ val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
+ val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
+}
Added: incubator/kafka/trunk/sbt
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/sbt?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/sbt (added)
+++ incubator/kafka/trunk/sbt Mon Aug 1 23:41:24 2011
@@ -0,0 +1 @@
+java -Xmx1024M -XX:MaxPermSize=512m -jar `dirname $0`/lib/sbt-launch.jar "$@"
Propchange: incubator/kafka/trunk/sbt
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/system_test/embedded_consumer/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/README?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/README (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/README Mon Aug 1 23:41:24 2011
@@ -0,0 +1,8 @@
+This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer.
+At the end, the messages produced at the source brokers should match that at the target brokers.
+
+To run this test, do
+bin/run-test.sh
+
+The expected output is given in bin/expected.out. There is only 1 thing that's important.
+1. The output should have a line "test passed".
Added: incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out Mon Aug 1 23:41:24 2011
@@ -0,0 +1,18 @@
+start the servers ...
+start producing messages ...
+wait for consumer to finish consuming ...
+[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
+thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
+[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
+Messages/sec: 17444.3960
+MB/sec: 3.3214
+test passed
+stopping the servers
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
Added: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,79 @@
+#!/bin/bash
+
+num_messages=400000
+message_size=400
+
+base_dir=$(dirname $0)/..
+
+rm -rf /tmp/zookeeper_source
+rm -rf /tmp/zookeeper_target
+rm -rf /tmp/kafka-source1-logs
+mkdir /tmp/kafka-source1-logs
+mkdir /tmp/kafka-source1-logs/test01-0
+touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka
+rm -rf /tmp/kafka-source2-logs
+mkdir /tmp/kafka-source2-logs
+mkdir /tmp/kafka-source2-logs/test01-0
+touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka
+rm -rf /tmp/kafka-source3-logs
+mkdir /tmp/kafka-source3-logs
+mkdir /tmp/kafka-source3-logs/test01-0
+touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka
+rm -rf /tmp/kafka-target1-logs
+rm -rf /tmp/kafka-target2-logs
+
+echo "start the servers ..."
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log &
+
+sleep 4
+echo "start producing messages ..."
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 &
+
+echo "wait for consumer to finish consuming ..."
+cur1_offset="-1"
+cur2_offset="-1"
+quit1=0
+quit2=0
+while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ]
+do
+ sleep 2
+ target1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+ if [ $target1_size -eq $cur1_offset ]
+ then
+ quit1=1
+ fi
+ cur1_offset=$target1_size
+ target2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+ if [ $target2_size -eq $cur2_offset ]
+ then
+ quit2=1
+ fi
+ cur2_offset=$target2_size
+done
+
+sleep 2
+source_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+source_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+source_part2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+target_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+target_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+
+expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size`
+actual_size=`expr $target_part0_size + $target_part1_size`
+if [ $expected_size != $actual_size ]
+then
+ echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!"
+else
+ echo "test passed"
+fi
+
+echo "stopping the servers"
+ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
+sleep 2
+ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
Propchange: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,14 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+embeddedconsumer.topics=test01:1
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source1-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=10000000
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9091
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source2-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=3
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9090
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source3-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9093
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target1-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9094
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target2-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source
+# the port at which the clients will connect
+clientPort=2181
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_target
+# the port at which the clients will connect
+clientPort=2182
Added: incubator/kafka/trunk/system_test/embedded_consumer/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/expected.out?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/expected.out (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/expected.out Mon Aug 1 23:41:24 2011
@@ -0,0 +1,11 @@
+start the servers ...
+start producing messages ...
+Total Num Messages: 10000000 bytes: 1994374785 in 106.076 secs
+Messages/sec: 94272.0314
+MB/sec: 17.9304
+[2011-05-02 11:50:29,022] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
+wait for consumer to finish consuming ...
+test passed
+bin/../../../bin/kafka-server-start.sh: line 11: 359 Terminated $(dirname $0)/kafka-run-class.sh kafka.Kafka $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 357 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 358 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
Added: incubator/kafka/trunk/system_test/producer_perf/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/README?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/README (added)
+++ incubator/kafka/trunk/system_test/producer_perf/README Mon Aug 1 23:41:24 2011
@@ -0,0 +1,9 @@
+This test produces a large number of messages to a broker. It measures the throughput and tests
+the amount of data received is expected.
+
+To run this test, do
+bin/run-test.sh
+
+The expected output is given in expected.out. There are 2 things to pay attention to:
+1. The output should have a line "test passed".
+2. The throughput from the producer should be around 300,000 Messages/sec on a typical machine.
Added: incubator/kafka/trunk/system_test/producer_perf/bin/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/expected.out?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/expected.out (added)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/expected.out Mon Aug 1 23:41:24 2011
@@ -0,0 +1,32 @@
+start the servers ...
+start producing 2000000 messages ...
+[2011-05-17 14:31:12,568] INFO Creating async producer for broker id = 0 at localhost:9092 (kafka.producer.ProducerPool)
+thread 0: 100000 messages sent 3272786.7779 nMsg/sec 3.1212 MBs/sec
+thread 0: 200000 messages sent 3685956.5057 nMsg/sec 3.5152 MBs/sec
+thread 0: 300000 messages sent 3717472.1190 nMsg/sec 3.5453 MBs/sec
+thread 0: 400000 messages sent 3730647.2673 nMsg/sec 3.5578 MBs/sec
+thread 0: 500000 messages sent 3730647.2673 nMsg/sec 3.5578 MBs/sec
+thread 0: 600000 messages sent 3722315.2801 nMsg/sec 3.5499 MBs/sec
+thread 0: 700000 messages sent 3718854.5928 nMsg/sec 3.5466 MBs/sec
+thread 0: 800000 messages sent 3714020.4271 nMsg/sec 3.5420 MBs/sec
+thread 0: 900000 messages sent 3713330.8578 nMsg/sec 3.5413 MBs/sec
+thread 0: 1000000 messages sent 3710575.1391 nMsg/sec 3.5387 MBs/sec
+thread 0: 1100000 messages sent 3711263.6853 nMsg/sec 3.5393 MBs/sec
+thread 0: 1200000 messages sent 3716090.6726 nMsg/sec 3.5439 MBs/sec
+thread 0: 1300000 messages sent 3709198.8131 nMsg/sec 3.5374 MBs/sec
+thread 0: 1400000 messages sent 3705762.4606 nMsg/sec 3.5341 MBs/sec
+thread 0: 1500000 messages sent 3701647.2330 nMsg/sec 3.5302 MBs/sec
+thread 0: 1600000 messages sent 3696174.4594 nMsg/sec 3.5249 MBs/sec
+thread 0: 1700000 messages sent 3703703.7037 nMsg/sec 3.5321 MBs/sec
+thread 0: 1800000 messages sent 3703017.9596 nMsg/sec 3.5315 MBs/sec
+thread 0: 1900000 messages sent 3700277.5208 nMsg/sec 3.5289 MBs/sec
+thread 0: 2000000 messages sent 3702332.4695 nMsg/sec 3.5308 MBs/sec
+[2011-05-17 14:33:01,102] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:33:01,103] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 2000000 bytes: 400000000 in 108.678 secs
+Messages/sec: 18402.9886
+MB/sec: 3.5101
+wait for data to be persisted
+test passed
+bin/../../../bin/kafka-server-start.sh: line 11: 21110 Terminated $(dirname $0)/kafka-run-class.sh kafka.Kafka $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 21109 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
Added: incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh (added)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+num_messages=2000000
+message_size=200
+
+base_dir=$(dirname $0)/..
+
+rm -rf /tmp/zookeeper
+rm -rf /tmp/kafka-logs
+
+echo "start the servers ..."
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log &
+$base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log &
+
+sleep 4
+echo "start producing $num_messages messages ..."
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 --compression-codec 1
+
+echo "wait for data to be persisted"
+cur_offset="-1"
+quit=0
+while [ $quit -eq 0 ]
+do
+ sleep 2
+ target_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+ if [ $target_size -eq $cur_offset ]
+ then
+ quit=1
+ fi
+ cur_offset=$target_size
+done
+
+sleep 2
+actual_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+num_batches=`expr $num_messages \/ $message_size`
+expected_size=`expr $num_batches \* 262`
+
+if [ $actual_size != $expected_size ]
+then
+ echo "actual size: $actual_size expected size: $expected_size test failed!!! look at it!!!"
+else
+ echo "test passed"
+fi
+
+ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+sleep 2
+ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+
Propchange: incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh (added)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+num_messages=2000000
+message_size=200
+
+base_dir=$(dirname $0)/..
+
+rm -rf /tmp/zookeeper
+rm -rf /tmp/kafka-logs
+
+echo "start the servers ..."
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log &
+$base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log &
+
+sleep 4
+echo "start producing $num_messages messages ..."
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10
+
+echo "wait for data to be persisted"
+cur_offset="-1"
+quit=0
+while [ $quit -eq 0 ]
+do
+ sleep 2
+ target_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+ if [ $target_size -eq $cur_offset ]
+ then
+ quit=1
+ fi
+ cur_offset=$target_size
+done
+
+sleep 2
+actual_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+msg_full_size=`expr $message_size + 10`
+expected_size=`expr $num_messages \* $msg_full_size`
+
+if [ $actual_size != $expected_size ]
+then
+ echo "actual size: $actual_size expected size: $expected_size test failed!!! look at it!!!"
+else
+ echo "test passed"
+fi
+
+ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+sleep 2
+ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+
Propchange: incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/system_test/producer_perf/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/config/server.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/config/server.properties (added)
+++ incubator/kafka/trunk/system_test/producer_perf/config/server.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=0
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties (added)
+++ incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
|