kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1350316 - in /incubator/kafka/branches/0.8/system_test: common/ single_host_multi_brokers/ single_host_multi_brokers/bin/ single_host_multi_brokers/config/
Date Thu, 14 Jun 2012 16:23:30 GMT
Author: junrao
Date: Thu Jun 14 16:23:30 2012
New Revision: 1350316

URL: http://svn.apache.org/viewvc?rev=1350316&view=rev
Log:
system test to validate consistency of replicas; patched by John Fung; reviewed by Jun Rao;
KAFKA-341

Added:
    incubator/kafka/branches/0.8/system_test/common/
    incubator/kafka/branches/0.8/system_test/common/util.sh
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/README
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/kafka-run-class.sh
  (with props)
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh   (with
props)
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/consumer.properties
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/zookeeper.properties

Added: incubator/kafka/branches/0.8/system_test/common/util.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/common/util.sh?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/common/util.sh (added)
+++ incubator/kafka/branches/0.8/system_test/common/util.sh Thu Jun 14 16:23:30 2012
@@ -0,0 +1,168 @@
+#!/bin/bash
+
+# =========================================
+# info - print messages with timestamp
+# =========================================
+info() {
+    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+# =========================================
+# info_no_newline - print messages with
+# timestamp without newline
+# =========================================
+info_no_newline() {
+    echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+# =========================================
+# get_random_range - return a random number
+#     between the lower & upper bounds
+# usage:
+#     get_random_range $lower $upper
+#     random_no=$?
+# =========================================
+get_random_range() {
+    lo=$1
+    up=$2
+    range=$(($up - $lo + 1))
+
+    return $(($(($RANDOM % range)) + $lo))
+}
+
+# =========================================
+# kill_child_processes - terminate a
+# process and its child processes
+# =========================================
+kill_child_processes() {
+    isTopmost=$1
+    curPid=$2
+    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+
+    for childPid in $childPids
+    do
+        kill_child_processes 0 $childPid
+    done
+    if [ $isTopmost -eq 0 ]; then
+        kill -15 $curPid 2> /dev/null
+    fi
+}
+
+# =========================================================================
+# generate_kafka_properties_files -
+# 1. it takes the following arguments and generate server_{1..n}.properties
+#    for the total no. of kafka broker as specified in "num_server"; the
+#    resulting properties files will be located at: 
+#      <kafka home>/system_test/<test suite>/config
+# 2. the default values in the generated properties files will be copied
+#    from the settings in config/server.properties while the brokerid and
+#    server port will be incremented accordingly
+# 3. to generate properties files with non-default values such as 
+#    "socket.send.buffer=2097152", simply add the property with new value
+#    to the array variable kafka_properties_to_replace as shown below
+# =========================================================================
+generate_kafka_properties_files() {
+
+    test_suite_full_path=$1      # eg. <kafka home>/system_test/single_host_multi_brokers
+    num_server=$2                # total no. of brokers in the cluster
+    brokerid_to_start=$3         # this should be '0' in most cases
+    kafka_port_to_start=$4       # if 9091 is used, the rest would be 9092, 9093, ...
+
+    this_config_dir=${test_suite_full_path}/config
+
+    # info "test suite full path : $test_suite_full_path"
+    # info "broker id to start   : $brokerid_to_start"
+    # info "kafka port to start  : $kafka_port_to_start"
+    # info "num of server        : $num_server"
+    # info "config dir           : $this_config_dir"
+
+    # =============================================
+    # array to keep kafka properties statements
+    # from the file 'server.properties' need
+    # to be changed from their default values
+    # =============================================
+    # kafka_properties_to_replace     # DO NOT uncomment this line !!
+
+    # =============================================
+    # Uncomment the following kafka properties
+    # array element as needed to change the default
+    # values. Other kafka properties can be added
+    # in a similar fashion.
+    # =============================================
+    # kafka_properties_to_replace[1]="socket.send.buffer=2097152"
+    # kafka_properties_to_replace[2]="socket.receive.buffer=2097152"
+    # kafka_properties_to_replace[3]="num.partitions=3"
+    # kafka_properties_to_replace[4]="max.socket.request.bytes=10485760"
+
+    server_properties=`cat ${this_config_dir}/server.properties`
+
+    for ((i=1; i<=$num_server; i++))
+    do
+        # ======================
+        # update misc properties
+        # ======================
+        for ((j=1; j<=${#kafka_properties_to_replace[@]}; j++))
+        do
+            keyword_to_replace=`echo ${kafka_properties_to_replace[${j}]} | awk -F '=' '{print
$1}'`
+            string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`

+            # info "string to be replaced : [$string_to_be_replaced]"
+            # info "string to replace     : [${kafka_properties_to_replace[${j}]}]"
+
+            echo "${server_properties}" | \
+              sed -e "s/${string_to_be_replaced}/${kafka_properties_to_replace[${j}]}/g"
\
+              >${this_config_dir}/server_${i}.properties
+
+            server_properties=`cat ${this_config_dir}/server_${i}.properties`
+        done
+
+        # ======================
+        # update brokerid
+        # ======================
+        keyword_to_replace="brokerid="
+        string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+        brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
+        string_to_replace="${keyword_to_replace}${brokerid_idx}"
+        # info "string to be replaced : [${string_to_be_replaced}]"
+        # info "string to replace     : [${string_to_replace}]"
+
+        echo "${server_properties}" | \
+          sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
+          >${this_config_dir}/server_${i}.properties
+
+        server_properties=`cat ${this_config_dir}/server_${i}.properties`
+
+        # ======================
+        # update kafak_port
+        # ======================
+        keyword_to_replace="port="
+        string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+        port_idx=$(( $kafka_port_to_start + $i - 1 ))
+        string_to_replace="${keyword_to_replace}${port_idx}"
+        # info "string to be replaced : [${string_to_be_replaced}]"
+        # info "string to replace     : [${string_to_replace}]"
+
+        echo "${server_properties}" | \
+          sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
+          >${this_config_dir}/server_${i}.properties
+
+        server_properties=`cat ${this_config_dir}/server_${i}.properties`
+
+        # ======================
+        # update kafka_log dir
+        # ======================
+        keyword_to_replace="log.dir="
+        string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
+        string_to_be_replaced=${string_to_be_replaced//\//\\\/}
+        string_to_replace="${keyword_to_replace}\/tmp\/kafka_server_${i}_logs"
+        # info "string to be replaced : [${string_to_be_replaced}]"
+        # info "string to replace     : [${string_to_replace}]"
+
+        echo "${server_properties}" | \
+          sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
+          >${this_config_dir}/server_${i}.properties
+
+        server_properties=`cat ${this_config_dir}/server_${i}.properties`
+
+     done
+}
+

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/README
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/README?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/README (added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/README Thu Jun 14 16:23:30
2012
@@ -0,0 +1,39 @@
+==================
+Test Descriptions
+==================
+This test suite performs Kafka system test on the replication feature as described here:
+1. Start the Kafka cluster
+2. Create topic
+3. Find the leader
+4. Stop the leader in Step 3
+5. Send n messages
+6. Consume the messages
+7. Start the leader in Step 3
+8. Goto Step 3 for all servers in the cluster
+9. Validate test results
+
+==================
+Quick Start
+==================
+1. Modify the values of "num_kafka_server" & "replica_factor" as needed.
+2. Execute the test as: 
+   <kafka home>/system_test/single_host_multi_brokers $ bin/run-test.sh
+
+==================
+Expected Results
+==================
+The following items should match:
+1. The checksums of the data files in all replicas
+2. The sizes of the data files in all replicas
+3. The no. of messages produced and consumed
+
+==================
+Notes
+==================
+1. There is no need to copy and paste the config/server.properties
+   files to match the no. of brokers in the Kafka cluster.
+2. The required no. of server properties files will be automatically
+   generated according to the value of "num_kafka_server".
+3. The default values in the generated properties files will be
+   copied from the settings in config/server.properties while the
+   brokerid and server port will be incremented accordingly.

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/kafka-run-class.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/kafka-run-class.sh?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/kafka-run-class.sh
(added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/kafka-run-class.sh
Thu Jun 14 16:23:30 2012
@@ -0,0 +1,67 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ $# -lt 1 ];
+then
+  echo "USAGE: $0 classname [opts]"
+  exit 1
+fi
+
+base_dir=$(dirname $0)/..
+kafka_inst_dir=${base_dir}/../..
+
+for file in $kafka_inst_dir/project/boot/scala-2.8.0/lib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $kafka_inst_dir/core/lib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $kafka_inst_dir/perf/target/scala_2.8.0/kafka*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $kafka_inst_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
+do
+  if [ ${file##*/} != "sbt-launch.jar" ]; then
+    CLASSPATH=$CLASSPATH:$file
+  fi
+done
+if [ -z "$KAFKA_JMX_OPTS" ]; then
+  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.ssl=false "
+fi
+if [ -z "$KAFKA_OPTS" ]; then
+  KAFKA_OPTS="-Xmx512M -server  -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
+fi
+if [  $JMX_PORT ]; then
+  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
+fi
+if [ -z "$JAVA_HOME" ]; then
+  JAVA="java"
+else
+  JAVA="$JAVA_HOME/bin/java"
+fi
+
+$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@

Propchange: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/kafka-run-class.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Thu
Jun 14 16:23:30 2012
@@ -0,0 +1,429 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==================================================================
+# run-test.sh
+# 
+# ==================================================================
+
+# ====================================
+# Do not change the followings
+# (keep this section at the beginning
+# of this script)
+# ====================================
+readonly system_test_root=$(dirname $0)/../..         # path of <kafka install>/system_test
+readonly common_dir=${system_test_root}/common        # common util scripts for system_test
+source   ${common_dir}/util.sh                        # include the util script
+
+readonly base_dir=$(dirname $0)/..                    # root of this test suite
+readonly base_dir_full_path=$(readlink -f $base_dir)  # full path of the root of this test
suite
+readonly config_dir=${base_dir}/config
+
+readonly test_start_time="$(date +%s)"                # time starting the test
+
+# ====================================
+# Change the followings as needed
+# ====================================
+readonly num_kafka_server=3                           # same no. of property files such as
server_{1..n}.properties
+                                                      # will be automatically generated
+readonly replica_factor=3                             # should be less than or equal to "num_kafka_server"
+readonly my_brokerid_to_start=0                       # this should be '0' for now
+readonly my_server_port_to_start=9091                 # if using this default, the ports
to be used will be 9091, 9092, ...
+readonly producer_msg_batch_size=200                  # batch no. of messsages by producer
+readonly consumer_timeout_ms=10000                    # elapsed time for consumer to timeout
and exit
+
+# ====================================
+# No need to change the following
+# configurations in most cases
+# ====================================
+readonly test_topic=mytest
+readonly max_wait_for_consumer_complete=30
+readonly zk_prop_pathname=${config_dir}/zookeeper.properties
+readonly zk_log4j_log_pathname=${base_dir}/zookeeper.log
+
+readonly producer_prop_pathname=${config_dir}/producer.properties
+readonly consumer_prop_pathname=${config_dir}/consumer.properties
+
+readonly producer_perf_log_pathname=${base_dir}/producer_perf_output.log
+readonly producer_perf_crc_log_pathname=${base_dir}/producer_perf_crc.log
+readonly producer_perf_crc_sorted_log_pathname=${base_dir}/producer_perf_crc_sorted.log
+readonly producer_perf_crc_sorted_uniq_log_pathname=${base_dir}/producer_perf_crc_sorted_uniq.log
+
+readonly console_consumer_log_pathname=${base_dir}/console_consumer.log
+readonly console_consumer_crc_log_pathname=${base_dir}/console_consumer_crc.log
+readonly console_consumer_crc_sorted_log_pathname=${base_dir}/console_consumer_crc_sorted.log
+readonly console_consumer_crc_sorted_uniq_log_pathname=${base_dir}/console_consumer_crc_sorted_uniq.log
+
+readonly this_test_stderr_output_log_pathname=${base_dir}/this_test_stderr_output.log
+
+# ====================================
+# arrays for kafka brokers properties
+# ====================================
+kafka_data_log_dirs=
+kafka_log4j_log_pathnames=
+kafka_prop_pathnames=
+kafka_brokerids=
+kafka_sock_ports=
+#kafka_first_data_file_sizes=
+#kafka_first_data_file_checksums=
+
+# ====================================
+# Misc
+# ====================================
+zk_port=
+zk_data_log_dir=
+pid_zk=
+kafka_pids=
+test_failure_counter=0
+
+initialize() {
+    info "initializing ..."
+
+    zk_port=`grep clientPort ${zk_prop_pathname} | awk -F '=' '{print $2}'`
+    zk_data_log_dir=`grep dataDir ${zk_prop_pathname} | awk -F '=' '{print $2}'`
+
+    for ((i=1; i<=$num_kafka_server; i++))
+    do
+        kafka_log4j_log_pathnames[${i}]=$base_dir/kafka_server_${i}.log
+        kafka_prop_pathnames[${i}]=${config_dir}/server_${i}.properties
+
+        kafka_data_log_dirs[${i}]=`grep ^log.dir ${kafka_prop_pathnames[${i}]} | awk -F '='
'{print $2}'`
+        kafka_brokerids[${i}]=`grep ^brokerid= ${kafka_prop_pathnames[${i}]} | awk -F '='
'{print $2}'`
+        kafka_sock_ports[${i}]=`grep ^port= ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print
$2}'`
+
+        info "kafka $i data dir   : ${kafka_data_log_dirs[$i]}"
+        info "kafka $i log4j log  : ${kafka_log4j_log_pathnames[$i]}"
+        info "kafka $i prop file  : ${kafka_prop_pathnames[$i]}"
+        info "kafka $i brokerid   : ${kafka_brokerids[$i]}"
+        info "kafka $i socket     : ${kafka_sock_ports[$i]}"
+    done
+
+    info "zookeeper port     : $zk_port"
+    info "zookeeper data dir : $zk_data_log_dir"
+    echo
+}
+
+cleanup() {
+    info "cleaning up kafka server log/data dir"
+    for ((i=1; i<=$num_kafka_server; i++))
+    do
+        rm -rf ${kafka_data_log_dirs[$i]}
+        rm -f ${kafka_log4j_log_pathnames[$i]}
+    done
+
+    rm -rf $zk_data_log_dir
+    rm -f $zk_log4j_log_pathname
+    rm -f $this_test_stderr_output_log_pathname
+
+    rm -f $producer_perf_log_pathname
+    rm -f $producer_perf_crc_log_pathname
+    rm -f $producer_perf_crc_sorted_log_pathname
+    rm -f $producer_perf_crc_sorted_uniq_log_pathname
+
+    rm -f $console_consumer_log_pathname
+    rm -f $console_consumer_crc_log_pathname
+    rm -f $console_consumer_crc_sorted_log_pathname
+    rm -f $console_consumer_crc_sorted_uniq_log_pathname
+}
+
+get_leader_brokerid() {
+    log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
+    info "found the log line: $log_line"
+    broker_id=`echo $log_line | awk -F ' ' '{print $5}'`
+
+    return $broker_id
+}
+
+start_zk() {
+    info "starting zookeeper"
+    $base_dir/../../bin/zookeeper-server-start.sh $zk_prop_pathname \
+        2>&1 > ${zk_log4j_log_pathname} &
+    pid_zk=$!
+}
+
+stop_server() {
+    s_idx=$1
+
+    info "stopping server: $s_idx"
+
+    if [ "x${kafka_pids[${s_idx}]}" != "x" ]; then
+        kill_child_processes 0 ${kafka_pids[${s_idx}]};
+    fi
+
+    kafka_pids[${s_idx}]=
+}
+
+start_server() {
+    s_idx=$1
+
+    info "starting kafka server"
+    $base_dir/bin/kafka-run-class.sh kafka.Kafka ${kafka_prop_pathnames[$s_idx]} \
+        2>&1 >> ${kafka_log4j_log_pathnames[$s_idx]} &
+    kafka_pids[${s_idx}]=$!
+    info "  -> kafka_pids[$s_idx]: ${kafka_pids[$s_idx]}"
+}
+
+start_servers_cluster() {
+    info "starting cluster"
+
+    for ((i=1; i<=$num_kafka_server; i++)) 
+    do
+        start_server $i
+    done
+}
+
+start_producer_perf() {
+    this_topic=$1
+    zk_conn_str=$2
+    no_msg_to_produce=$3
+
+    info "starting producer performance"
+
+    ${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \
+        --brokerinfo "zk.connect=${zk_conn_str}" \
+        --topic ${this_topic} \
+        --messages $no_msg_to_produce \
+        --vary-message-size \
+        --message-size 100 \
+        --threads 1 \
+        --async \
+        2>&1 >> $producer_perf_log_pathname
+}
+
+start_console_consumer() {
+    this_consumer_topic=$1
+    this_zk_conn_str=$2
+
+    info "starting console consumer"
+    $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
+        --zookeeper $this_zk_conn_str \
+        --topic $this_consumer_topic \
+        --formatter 'kafka.consumer.ConsoleConsumer$ChecksumMessageFormatter' \
+        --consumer-timeout-ms $consumer_timeout_ms \
+        2>&1 >> $console_consumer_log_pathname &
+}
+
+shutdown_servers() {
+
+    info "shutting down servers"
+    for ((i=1; i<=$num_kafka_server; i++))
+    do
+        if [ "x${kafka_pids[$i]}" != "x" ]; then
+            kill_child_processes 0 ${kafka_pids[$i]};
+        fi
+    done
+
+    info "shutting down zookeeper servers"
+    if [ "x${pid_zk}" != "x" ]; then kill_child_processes 0 ${pid_zk}; fi
+}
+
+force_shutdown_producer() {
+    info "force shutdown producer"
+    `ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9`
+}
+
+force_shutdown_consumer() {
+    info "force shutdown consumer"
+    `ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9`
+}
+
+create_topic() {
+    this_topic_to_create=$1
+    this_zk_conn_str=$2
+    this_replica_factor=$3
+
+    info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]"
+    $base_dir/../../bin/kafka-create-topic.sh --topic $this_topic_to_create \
+        --zookeeper $this_zk_conn_str --replica $this_replica_factor
+}
+
+validate_results() {
+
+    echo
+    info "========================================================"
+    info "VALIDATING TEST RESULTS"
+    info "========================================================"
+
+    # get the checksums and sizes of the replica data files
+    for ((i=1; i<=$num_kafka_server; i++))
+    do
+        first_data_file_dir=${kafka_data_log_dirs[$i]}/${test_topic}-0
+        first_data_file=`ls ${first_data_file_dir} | head -1`
+        first_data_file_pathname=${first_data_file_dir}/$first_data_file
+        kafka_first_data_file_sizes[$i]=`stat -c%s ${first_data_file_pathname}`
+        kafka_first_data_file_checksums[$i]=`cksum ${first_data_file_pathname} | awk '{print
$1}'`
+        info "## broker[$i] data file: ${first_data_file_pathname} : [${kafka_first_data_file_sizes[$i]}]"
+        info "##     ==> crc ${kafka_first_data_file_checksums[$i]}"
+    done
+
+    # get the checksums from messages produced and consumed
+    grep checksum $console_consumer_log_pathname | tr -d ' ' | awk -F ':' '{print $2}' >
$console_consumer_crc_log_pathname
+    grep checksum $producer_perf_log_pathname | tr ' ' '\n' | grep checksum | awk -F ':'
'{print $2}' > $producer_perf_crc_log_pathname
+
+    sort $console_consumer_crc_log_pathname > $console_consumer_crc_sorted_log_pathname
+    sort $producer_perf_crc_log_pathname    > $producer_perf_crc_sorted_log_pathname
+
+    sort -u $console_consumer_crc_sorted_log_pathname > $console_consumer_crc_sorted_uniq_log_pathname
+    sort -u $producer_perf_crc_sorted_log_pathname    > $producer_perf_crc_sorted_uniq_log_pathname
+
+    msg_count_from_console_consumer=`cat $console_consumer_crc_log_pathname | wc -l | tr
-d ' '`
+    uniq_msg_count_from_console_consumer=`cat $console_consumer_crc_sorted_uniq_log_pathname
| wc -l | tr -d ' '`
+
+    msg_count_from_producer_perf=`cat $producer_perf_crc_log_pathname | wc -l | tr -d ' '`
+    uniq_msg_count_from_producer_perf=`cat $producer_perf_crc_sorted_uniq_log_pathname |
wc -l | tr -d ' '`
+
+    # report the findings
+    echo
+    info "## no. of messages published            : $msg_count_from_producer_perf"
+    info "## producer unique msg published        : $uniq_msg_count_from_producer_perf"
+    info "## console consumer msg rec'd           : $msg_count_from_console_consumer"
+    info "## console consumer unique msg rec'd    : $uniq_msg_count_from_console_consumer"
+    echo
+
+    validation_start_unix_ts=`date +%s`
+    curr_unix_ts=`date +%s`
+    size_unmatched_idx=1
+    while [[ $(( $curr_unix_ts - $validation_start_unix_ts )) -le $max_wait_for_consumer_complete
&& $size_unmatched_idx -gt 0 ]]
+    do
+        info "wait 5s (up to ${max_wait_for_consumer_complete}s) and check replicas data
sizes"
+        sleep 5
+        
+        first_element_value=${kafka_first_data_file_sizes[1]}
+        for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
+        do
+            if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
+                size_unmatched_idx=1
+                break
+            else
+                size_unmatched_idx=0
+            fi
+        done
+
+        curr_unix_ts=`date +%s`
+    done
+
+    # validate that sizes of all replicas should match
+    first_element_value=${kafka_first_data_file_sizes[1]}
+    for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
+    do
+        if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
+            info "## FAILURE: Unmatched size found"
+            test_failure_counter=$(( $test_failure_counter + 1 ))
+        fi
+    done
+
+    # validate that checksums of all replicas should match
+    first_element_value=${kafka_first_data_file_checksums[1]}
+    for ((i=2; i<=${#kafka_first_data_file_checksums[@]}; i++))
+    do
+        if [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
+            info "## FAILURE: Unmatched checksum found"
+            test_failure_counter=$(( $test_failure_counter + 1 ))
+        fi
+    done
+
+    # validate that there is no data loss
+    if [ $uniq_msg_count_from_producer_perf -ne $uniq_msg_count_from_console_consumer ];
then
+        test_failure_counter=$(( $test_failure_counter + 1 ))
+    fi
+
+    # report PASSED or FAILED
+    info "========================================================"
+    if [ $test_failure_counter -eq 0 ]; then
+        info "## Test PASSED"
+    else
+        info "## Test FAILED"
+    fi
+    info "========================================================"
+}
+
+
+start_test() {
+    echo
+    info "======================================="
+    info "####  Kafka Replicas System Test   ####"
+    info "======================================="
+    echo
+
+    # Ctrl-c trap. Catches INT signal
+    trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" INT
+
+    generate_kafka_properties_files $base_dir_full_path $num_kafka_server $my_brokerid_to_start
$my_server_port_to_start 
+
+    initialize
+
+    cleanup
+    sleep 2
+
+    start_zk
+    sleep 2
+
+    start_servers_cluster
+    sleep 2
+
+    create_topic $test_topic localhost:$zk_port $replica_factor 2> $this_test_stderr_output_log_pathname
+
+    info "sleeping for 5s"
+    sleep 5 
+    echo
+
+    for ((i=1; i<=$num_kafka_server; i++))
+    do
+        info "kafka server [$i] - reading leader"
+        get_leader_brokerid
+        ldr_bkr_id=$?
+        info "leader broker id: $ldr_bkr_id"
+
+        svr_idx=$(($ldr_bkr_id + 1))
+
+        # ==========================================================
+        # If KAFKA-350 is fixed, uncomment the following 3 lines to
+        # STOP the server for failure test
+        # ==========================================================
+        #stop_server $svr_idx
+        #info "sleeping for 10s"
+        #sleep 10
+
+        start_console_consumer $test_topic localhost:$zk_port
+        info "sleeping for 5s"
+        sleep 5
+
+        start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size
+        info "sleeping for 15s"
+        sleep 15
+        echo
+
+        # ==========================================================
+        # If KAFKA-350 is fixed, uncomment the following 3 lines to
+        # START the server for failure test
+        # ==========================================================
+        #start_server $svr_idx
+        #info "sleeping for 30s"
+        #sleep 30
+    done
+
+    validate_results
+    echo
+
+    shutdown_servers
+    echo
+}
+
+# =================================================
+# Main Test
+# =================================================
+
+start_test

Propchange: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/consumer.properties?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/consumer.properties
(added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/consumer.properties
Thu Jun 14 16:23:30 2012
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=127.0.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
(added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
Thu Jun 14 16:23:30 2012
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+
+
+# Turn on all our debugging info
+#log4j.logger.kafka=INFO
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+
+# to print message checksum from ProducerPerformance
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+
+# to print message checksum from ProducerPerformance
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties
(added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties
Thu Jun 14 16:23:30 2012
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+############################# Producer Basics #############################
+
+# need to set either broker.list or zk.connect
+
+# configure brokers statically
+# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
+#broker.list=0:localhost:9092
+
+# discover brokers from ZK
+zk.connect=localhost:2181
+
+# zookeeper session timeout; default is 6000
+#zk.sessiontimeout.ms=
+
+# the max time that the client waits to establish a connection to zookeeper; default is 6000
+#zk.connectiontimeout.ms
+
+# name of the partitioner class for partitioning events; default partition spreads data randomly
+#partitioner.class=
+
+# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
+producer.type=sync
+
+# specify the compression codec for all data generated: 0: no compression, 1: gzip
+compression.codec=0
+
+# message encoder
+serializer.class=kafka.serializer.StringEncoder
+
+# allow topic level compression
+#compressed.topics=
+
+# max message size; messages larger than that size are discarded; default is 1000000
+#max.message.size=
+
+
+############################# Async Producer #############################
+# maximum time, in milliseconds, for buffering data on the producer queue 
+#queue.time=
+
+# the maximum size of the blocking queue for buffering on the producer 
+#queue.size=
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+#queue.enqueueTimeout.ms=
+
+# the number of messages batched at the producer 
+#batch.size=
+
+# the callback handler for one or multiple events 
+#callback.handler=
+
+# properties required to initialize the callback handler 
+#callback.handler.props=
+
+# the handler for events 
+#event.handler=
+
+# properties required to initialize the event handler 
+#event.handler.props=
+

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties
(added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties
Thu Jun 14 16:23:30 2012
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#hostname=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+network.threads=2
+ 
+# The number of threads doing disk I/O
+io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_1_logs
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=1
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds
latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period
of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always
happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the
remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will
be created.
+log.file.size=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according

+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000

Added: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/zookeeper.properties?rev=1350316&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/zookeeper.properties
(added)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/zookeeper.properties
Thu Jun 14 16:23:30 2012
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0



Mime
View raw message