kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [4/4] kafka git commit: KAFKA-4345; Run decktape test for each pull request
Date Thu, 24 Nov 2016 04:49:06 GMT
KAFKA-4345; Run decktape test for each pull request

As of now the ducktape tests that we have for kafka are not run for pull request. We can run these test using travis-ci. Here is a sample run:
https://travis-ci.org/raghavgautam/kafka/builds/170574293

Author: Raghav Kumar Gautam <raghav@apache.org>

Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com>

Closes #2064 from raghavgautam/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e035fc03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e035fc03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e035fc03

Branch: refs/heads/trunk
Commit: e035fc039598127e88f31739458f705290b1fdba
Parents: 724cddb
Author: Raghav Kumar Gautam <raghav@apache.org>
Authored: Wed Nov 23 20:48:58 2016 -0800
Committer: Sriharsha Chintalapani <harsha@hortonworks.com>
Committed: Wed Nov 23 20:48:58 2016 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  48 +++
 gradle/wrapper/gradle-wrapper.jar               | Bin 0 -> 52928 bytes
 gradle/wrapper/gradle-wrapper.properties        |   6 +
 tests/README.md                                 |  37 +++
 tests/cluster_file.json                         |  97 ++++++
 tests/kafkatest/directory_layout/kafka_path.py  |  10 +-
 tests/kafkatest/services/mirror_maker.py        |   4 +-
 tests/kafkatest/tests/client/__init__.py        |  14 -
 .../kafkatest/tests/client/compression_test.py  |  85 ------
 .../client/consumer_rolling_upgrade_test.py     |  82 -----
 tests/kafkatest/tests/client/consumer_test.py   | 297 -------------------
 .../tests/client/message_format_change_test.py  |  90 ------
 tests/kafkatest/tests/client/quota_test.py      | 219 --------------
 tests/kafkatest/tests/client1/__init__.py       |   0
 .../client1/consumer_rolling_upgrade_test.py    |  82 +++++
 .../tests/client1/message_format_change_test.py |  90 ++++++
 tests/kafkatest/tests/client1/quota_test.py     | 219 ++++++++++++++
 tests/kafkatest/tests/client2/__init__.py       |  14 +
 .../kafkatest/tests/client2/compression_test.py |  85 ++++++
 tests/kafkatest/tests/client2/consumer_test.py  | 297 +++++++++++++++++++
 tests/kafkatest/tests/core/__init__.py          |  14 -
 .../core/compatibility_test_new_broker_test.py  |  80 -----
 .../tests/core/consumer_group_command_test.py   | 106 -------
 .../tests/core/get_offset_shell_test.py         |  91 ------
 tests/kafkatest/tests/core/mirror_maker_test.py | 179 -----------
 .../tests/core/reassign_partitions_test.py      | 110 -------
 tests/kafkatest/tests/core/replication_test.py  | 154 ----------
 .../tests/core/security_rolling_upgrade_test.py | 190 ------------
 tests/kafkatest/tests/core/security_test.py     | 106 -------
 .../tests/core/simple_consumer_shell_test.py    |  75 -----
 tests/kafkatest/tests/core/throttling_test.py   | 173 -----------
 tests/kafkatest/tests/core/upgrade_test.py      | 128 --------
 .../core/zookeeper_security_upgrade_test.py     | 115 -------
 tests/kafkatest/tests/core1/__init__.py         |   0
 .../tests/core1/consumer_group_command_test.py  | 106 +++++++
 .../tests/core1/get_offset_shell_test.py        |  91 ++++++
 .../tests/core1/reassign_partitions_test.py     | 110 +++++++
 .../tests/core1/simple_consumer_shell_test.py   |  75 +++++
 tests/kafkatest/tests/core1/throttling_test.py  | 173 +++++++++++
 tests/kafkatest/tests/core2/__init__.py         |  14 +
 .../core2/compatibility_test_new_broker_test.py |  80 +++++
 tests/kafkatest/tests/mirror_maker/__init__.py  |   0
 .../tests/mirror_maker/mirror_maker_test.py     | 179 +++++++++++
 .../kafkatest/tests/produce_consume_validate.py |   3 +-
 tests/kafkatest/tests/replication/__init__.py   |   0
 .../tests/replication/replication_test.py       | 154 ++++++++++
 tests/kafkatest/tests/security1/__init__.py     |   0
 .../kafkatest/tests/security1/security_test.py  | 106 +++++++
 .../zookeeper_security_upgrade_test.py          | 115 +++++++
 tests/kafkatest/tests/security2/__init__.py     |   0
 .../security2/security_rolling_upgrade_test.py  | 190 ++++++++++++
 tests/kafkatest/tests/upgrade/__init__.py       |   0
 tests/kafkatest/tests/upgrade/upgrade_test.py   | 128 ++++++++
 tests/travis/Dockerfile                         |  38 +++
 tests/travis/run_tests.sh                       |  58 ++++
 tests/travis/ssh/authorized_keys                |  15 +
 tests/travis/ssh/config                         |  21 ++
 tests/travis/ssh/id_rsa                         |  27 ++
 tests/travis/ssh/id_rsa.pub                     |   1 +
 59 files changed, 2665 insertions(+), 2316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..fa3ab0b
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,48 @@
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+sudo: required
+dist: trusty
+language: java
+
+# TODO enable failing splits after they have been stablized
+env:
+  - TC_PATHS="tests/kafkatest/tests/client1"
+  - TC_PATHS="tests/kafkatest/tests/client2"
+# - TC_PATHS="tests/kafkatest/tests/connect tests/kafkatest/tests/streams tests/kafkatest/tests/tools"
+# - TC_PATHS="tests/kafkatest/tests/mirror_maker"
+# - TC_PATHS="tests/kafkatest/tests/replication"
+# - TC_PATHS="tests/kafkatest/tests/upgrade"
+  - TC_PATHS="tests/kafkatest/tests/security1"
+# - TC_PATHS="tests/kafkatest/tests/security2"
+# - TC_PATHS="tests/kafkatest/tests/core1"
+  - TC_PATHS="tests/kafkatest/tests/core2"
+
+jdk:
+  - oraclejdk8
+
+before_install:
+
+script:
+  - ./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh
+
+services:
+  - docker
+
+before_cache:
+  - rm -f  $HOME/.gradle/caches/modules-2/modules-2.lock
+  - rm -fr $HOME/.gradle/caches/*/plugin-resolution/
+cache:
+  directories:
+    - "$HOME/.m2/repository"
+    - "$HOME/.gradle/caches/"
+    - "$HOME/.gradle/wrapper/"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..6ffa237
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..cde46f5
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Fri Oct 07 16:09:33 PDT 2016
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-bin.zip

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index 098922f..9056b99 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -6,6 +6,43 @@ This directory contains Kafka system integration and performance tests.
 (ducktape is a distributed testing framework which provides test runner,
 result reporter and utilities to pull up and tear down services.)
 
+Running tests using docker
+--------------------------
+Docker is used for running kafka system tests on travis-ci. And exactly same setup can be run for development purposes.
+
+* Run all tests
+```
+bash tests/travis/run_tests.sh
+```
+* Run all tests with debug on (warning will produce log of logs)
+```
+_DUCKTAPE_OPTIONS="--debug" bash tests/travis/run_tests.sh | tee debug_logs.txt
+```
+* Run a subset of tests
+```
+TC_PATHS="tests/kafkatest/tests/streams tests/kafkatest/tests/tools" bash tests/travis/run_tests.sh
+```
+
+Examining CI run
+----------------
+* Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the following build
+```
+https://travis-ci.org/raghavgautam/kafka/builds/169519874
+```
+
+* Getting number of tests that were actually run
+```
+for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*setting up' | wc
+```
+* Getting number of tests that passed
+```
+for id in $(curl -sSL https://api.travis-ci.org/builds/$BUILD_ID | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done | egrep 'SerialTestRunner.*PASS' | wc
+```
+* Getting all the logs produced from a run
+```
+for id in $(curl -sSL https://api.travis-ci.org/builds/169519874 | jq '.matrix|map(.id)|.[]'); do curl -sSL "https://api.travis-ci.org/jobs/$id/log.txt?deansi=true" ; done
+```
+
 Local Quickstart
 ----------------
 This quickstart will help you run the Kafka system tests on your local machine. Note this requires bringing up a cluster of virtual machines on your local computer, which is memory intensive; it currently requires around 10G RAM.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/cluster_file.json
----------------------------------------------------------------------
diff --git a/tests/cluster_file.json b/tests/cluster_file.json
new file mode 100644
index 0000000..dbf3f04
--- /dev/null
+++ b/tests/cluster_file.json
@@ -0,0 +1,97 @@
+{
+  "_comment": [
+    "Licensed to the Apache Software Foundation (ASF) under one or more",
+    "contributor license agreements.  See the NOTICE file distributed with",
+    "this work for additional information regarding copyright ownership.",
+    "The ASF licenses this file to You under the Apache License, Version 2.0",
+    "(the \"License\"); you may not use this file except in compliance with",
+    "the License.  You may obtain a copy of the License at",
+    "",
+    "http://www.apache.org/licenses/LICENSE-2.0",
+    "",
+    "Unless required by applicable law or agreed to in writing, software",
+    "distributed under the License is distributed on an \"AS IS\" BASIS,",
+    "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.",
+    "See the License for the specific language governing permissions and",
+    "limitations under the License."
+  ],
+  "nodes": [
+    {
+      "hostname": "knode02.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode02.knw"
+    },
+    {
+      "hostname": "knode03.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode03.knw"
+    },
+    {
+      "hostname": "knode04.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode04.knw"
+    },
+    {
+      "hostname": "knode05.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode05.knw"
+    },
+    {
+      "hostname": "knode06.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode06.knw"
+    },
+    {
+      "hostname": "knode07.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode07.knw"
+    },
+    {
+      "hostname": "knode08.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode08.knw"
+    },
+    {
+      "hostname": "knode09.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode09.knw"
+    },
+    {
+      "hostname": "knode10.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode10.knw"
+    },
+    {
+      "hostname": "knode11.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode11.knw"
+    },
+    {
+      "hostname": "knode12.knw",
+      "user": "root",
+      "ssh_args": "",
+      "ssh_hostname": "",
+      "externally_routable_ip": "knode12.knw"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
index 0e60aff..9688174 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -44,11 +44,11 @@ TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"
 
 JARS = {
     "trunk": {
-        CORE_JAR_NAME: "core/build/*/*.jar",
-        CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
-        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
-        TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
-        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
+        CORE_JAR_NAME: "libs/*.jar",
+        CORE_LIBS_JAR_NAME: "libs/*.jar",
+        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar",
+        TOOLS_JAR_NAME: "libs/*.jar",
+        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/*.jar"
     }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 14af4cf..626a0ff 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -180,13 +180,13 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
         cmd = self.start_cmd(node)
         self.logger.debug("Mirror maker command: %s", cmd)
         node.account.ssh(cmd, allow_fail=False)
-        wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
+        wait_until(lambda: self.alive(node), timeout_sec=30, backoff_sec=.5,
                    err_msg="Mirror maker took to long to start.")
         self.logger.debug("Mirror maker is alive")
 
     def stop_node(self, node, clean_shutdown=True):
         node.account.kill_process("java", allow_fail=True, clean_shutdown=clean_shutdown)
-        wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5,
+        wait_until(lambda: not self.alive(node), timeout_sec=30, backoff_sec=.5,
                    err_msg="Mirror maker took to long to stop.")
 
     def clean_node(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/__init__.py b/tests/kafkatest/tests/client/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/client/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
deleted file mode 100644
index 0de53ae..0000000
--- a/tests/kafkatest/tests/client/compression_test.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int_with_prefix
-
-class CompressionTest(ProduceConsumeValidateTest):
-    """
-    These tests validate produce / consume for compressed topics.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(CompressionTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 10,
-                                                                    "replication-factor": 1}})
-        self.num_partitions = 10
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 4
-        self.messages_per_producer = 1000
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
-    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
-    def test_compressed_topic(self, compression_types, new_consumer):
-        """Test produce => consume => validate for compressed topics
-        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
-
-        compression_types parameter gives a list of compression types (or no compression if
-        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
-        compression type from the list based on producer's index in the group.
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = "PLAINTEXT"
-        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int_with_prefix,
-                                           compression_types=compression_types)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
-                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
-                                        message_validator=is_int_with_prefix)
-        self.kafka.start()
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 4
-    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
-    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
-    def __init__(self, test_context):
-        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
-                                                         num_zk=1, num_brokers=1, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
-        })
-
-    def _verify_range_assignment(self, consumer):
-        # range assignment should give us two partition sets: (0, 1) and (2, 3)
-        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
-            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
-    def _verify_roundrobin_assignment(self, consumer):
-        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
-            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
-    def rolling_update_test(self):
-        """
-        Verify rolling updates of partition assignment strategies works correctly. In this
-        test, we use a rolling restart to change the group's assignment strategy from "range" 
-        to "roundrobin." We verify after every restart that all members are still in the group
-        and that the correct assignment strategy was used.
-        """
-
-        # initialize the consumer using range assignment
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
-        consumer.start()
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-
-        # change consumer configuration to prefer round-robin assignment, but still support range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
-        # restart one of the nodes and verify that we are still using range assignment
-        consumer.stop_node(consumer.nodes[0])
-        consumer.start_node(consumer.nodes[0])
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-        
-        # now restart the other node and verify that we have switched to round-robin
-        consumer.stop_node(consumer.nodes[1])
-        consumer.start_node(consumer.nodes[1])
-        self.await_all_members(consumer)
-        self._verify_roundrobin_assignment(consumer)
-
-        # if we want, we can now drop support for range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN
-        for node in consumer.nodes:
-            consumer.stop_node(node)
-            consumer.start_node(node)
-            self.await_all_members(consumer)
-            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
deleted file mode 100644
index 534f65c..0000000
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ /dev/null
@@ -1,297 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-import signal
-
-class OffsetValidationTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 1
-
-    def __init__(self, test_context):
-        super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
-                                                     num_zk=1, num_brokers=2, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
-        })
-
-    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in consumer.nodes:
-                consumer.stop_node(node, clean_shutdown)
-
-                wait_until(lambda: len(consumer.dead_nodes()) == 1,
-                           timeout_sec=self.session_timeout_sec+5,
-                           err_msg="Timed out waiting for the consumer to shutdown")
-
-                consumer.start_node(node)
-
-                self.await_all_members(consumer)
-                self.await_consumed_messages(consumer)
-
-    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in consumer.nodes:
-                consumer.stop_node(node, clean_shutdown)
-
-            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
-                       err_msg="Timed out waiting for the consumers to shutdown")
-            
-            for node in consumer.nodes:
-                consumer.start_node(node)
-
-            self.await_all_members(consumer)
-            self.await_consumed_messages(consumer)
-
-    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in self.kafka.nodes:
-                self.kafka.restart_node(node, clean_shutdown=True)
-                self.await_all_members(consumer)
-                self.await_consumed_messages(consumer)
-
-    def setup_consumer(self, topic, **kwargs):
-        # collect verifiable consumer events since this makes debugging much easier
-        consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
-        self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
-        return consumer
-
-    def test_broker_rolling_bounce(self):
-        """
-        Verify correct consumer behavior when the brokers are consecutively restarted.
-
-        Setup: single Kafka cluster with one producer writing messages to a single topic with one
-        partition, an a set of consumers in the same group reading from the same topic.
-
-        - Start a producer which continues producing new messages throughout the test.
-        - Start up the consumers and wait until they've joined the group.
-        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
-          each broker restart.
-        - Verify delivery semantics according to the failure type and that the broker bounces
-          did not cause unexpected group rebalances.
-        """
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
-
-        producer.start()
-        self.await_produced_messages(producer)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        num_rebalances = consumer.num_rebalances()
-        # TODO: make this test work with hard shutdowns, which probably requires
-        #       pausing before the node is restarted to ensure that any ephemeral
-        #       nodes have time to expire
-        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
-        
-        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
-        assert unexpected_rebalances == 0, \
-            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
-
-        consumer.stop_all()
-
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
-        """
-        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
-
-        Setup: single Kafka cluster with one producer and a set of consumers in one group.
-
-        - Start a producer which continues producing new messages throughout the test.
-        - Start up the consumers and wait until they've joined the group.
-        - In a loop, restart each consumer, waiting for each one to rejoin the group before
-          restarting the rest.
-        - Verify delivery semantics according to the failure type.
-        """
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
-
-        producer.start()
-        self.await_produced_messages(producer)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        if bounce_mode == "all":
-            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
-        else:
-            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
-                
-        consumer.stop_all()
-        if clean_shutdown:
-            # if the total records consumed matches the current position, we haven't seen any duplicates
-            # this can only be guaranteed with a clean shutdown
-            assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
-        else:
-            # we may have duplicates in a hard failure
-            assert consumer.current_position(partition) <= consumer.total_consumed(), \
-                "Current position greater than the total number of consumed records"
-
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
-        producer = self.setup_producer(self.TOPIC)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        partition_owner = consumer.owner(partition)
-        assert partition_owner is not None
-
-        # startup the producer and ensure that some records have been written
-        producer.start()
-        self.await_produced_messages(producer)
-
-        # stop the partition owner and await its shutdown
-        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
-        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
-                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
-
-        # ensure that the remaining consumer does some work after rebalancing
-        self.await_consumed_messages(consumer, min_messages=1000)
-
-        consumer.stop_all()
-
-        if clean_shutdown:
-            # if the total records consumed matches the current position, we haven't seen any duplicates
-            # this can only be guaranteed with a clean shutdown
-            assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
-        else:
-            # we may have duplicates in a hard failure
-            assert consumer.current_position(partition) <= consumer.total_consumed(), \
-                "Current position greater than the total number of consumed records"
-
-        # if autocommit is not turned on, we can also verify the last committed offset
-        if not enable_autocommit:
-            assert consumer.last_commit(partition) == consumer.current_position(partition), \
-                "Last committed offset did not match last consumed position"
-
-
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_broker_failure(self, clean_shutdown, enable_autocommit):
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
-        producer = self.setup_producer(self.TOPIC)
-
-        producer.start()
-        consumer.start()
-        self.await_all_members(consumer)
-
-        num_rebalances = consumer.num_rebalances()
-
-        # shutdown one of the brokers
-        # TODO: we need a way to target the coordinator instead of picking arbitrarily
-        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
-
-        # ensure that the consumers do some work after the broker failure
-        self.await_consumed_messages(consumer, min_messages=1000)
-
-        # verify that there were no rebalances on failover
-        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
-
-        consumer.stop_all()
-
-        # if the total records consumed matches the current position, we haven't seen any duplicates
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-        # if autocommit is not turned on, we can also verify the last committed offset
-        if not enable_autocommit:
-            assert consumer.last_commit(partition) == consumer.current_position(partition), \
-                "Last committed offset did not match last consumed position"
-
-    def test_group_consumption(self):
-        """
-        Verifies correct group rebalance behavior as consumers are started and stopped. 
-        In particular, this test verifies that the partition is readable after every
-        expected rebalance.
-
-        Setup: single Kafka cluster with a group of consumers reading from one topic
-        with one partition while the verifiable producer writes to it.
-
-        - Start the consumers one by one, verifying consumption after each rebalance
-        - Shutdown the consumers one by one, verifying consumption after each rebalance
-        """
-        consumer = self.setup_consumer(self.TOPIC)
-        producer = self.setup_producer(self.TOPIC)
-
-        partition = TopicPartition(self.TOPIC, 0)
-
-        producer.start()
-
-        for num_started, node in enumerate(consumer.nodes, 1):
-            consumer.start_node(node)
-            self.await_members(consumer, num_started)
-            self.await_consumed_messages(consumer)
-
-        for num_stopped, node in enumerate(consumer.nodes, 1):
-            consumer.stop_node(node)
-
-            if num_stopped < self.num_consumers:
-                self.await_members(consumer, self.num_consumers - num_stopped)
-                self.await_consumed_messages(consumer)
-
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-        assert consumer.last_commit(partition) == consumer.current_position(partition), \
-            "Last committed offset did not match last consumed position"
-
-
-class AssignmentValidationTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 6
-
-    def __init__(self, test_context):
-        super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
-                                                num_zk=1, num_brokers=2, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
-        })
-
-    @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
-                                 "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
-    def test_valid_assignment(self, assignment_strategy):
-        """
-        Verify assignment strategy correctness: each partition is assigned to exactly
-        one consumer instance.
-
-        Setup: single Kafka cluster with a set of consumers in the same group.
-
-        - Start the consumers one by one
-        - Validate assignment after every expected rebalance
-        """
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
-        for num_started, node in enumerate(consumer.nodes, 1):
-            consumer.start_node(node)
-            self.await_members(consumer, num_started)
-            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
-            

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
deleted file mode 100644
index a57c04b..0000000
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 100
-
-    def produce_and_consume(self, producer_version, consumer_version, group):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic,
-                                           throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           version=KafkaVersion(producer_version))
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-        self.consumer.group_id = group
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-        
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
-    def test_compatibility(self, producer_version, consumer_version):
-        """ This tests performs the following checks:
-        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
-        that produce to and consume from a 0.10.x cluster
-        1. initially the topic is using message format 0.9.0
-        2. change the message format version for topic to 0.10.0 on the fly.
-        3. change the message format version for topic back to 0.9.0 on the fly.
-        - The producers and consumers should not have any issue.
-        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
-        """
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-       
-        self.kafka.start()
-        self.logger.info("First format change to 0.9.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-        self.produce_and_consume(producer_version, consumer_version, "group1")
-
-        self.logger.info("Second format change to 0.10.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
-        self.produce_and_consume(producer_version, consumer_version, "group2")
-
-        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
-            self.logger.info("Third format change back to 0.9.0")
-            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-            self.produce_and_consume(producer_version, consumer_version, "group3")
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
deleted file mode 100644
index 1d31569..0000000
--- a/tests/kafkatest/tests/client/quota_test.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import matrix, parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer
-
-class QuotaConfig(object):
-    CLIENT_ID = 'client-id'
-    USER = 'user'
-    USER_CLIENT = '(user, client-id)'
-
-    LARGE_QUOTA = 1000 * 1000 * 1000
-    USER_PRINCIPAL = 'CN=systemtest'
-
-    def __init__(self, quota_type, override_quota, kafka):
-        if quota_type == QuotaConfig.CLIENT_ID:
-            if override_quota:
-                self.client_id = 'overridden_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-            else:
-                self.client_id = 'default_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
-        elif quota_type == QuotaConfig.USER:
-            if override_quota:
-                self.client_id = 'some_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
-            else:
-                self.client_id = 'some_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-        elif quota_type == QuotaConfig.USER_CLIENT:
-            if override_quota:
-                self.client_id = 'overridden_id'
-                self.producer_quota = 3750000
-                self.consumer_quota = 3000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
-            else:
-                self.client_id = 'default_id'
-                self.producer_quota = 2500000
-                self.consumer_quota = 2000000
-                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
-                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
-
-    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
-        node = kafka.nodes[0]
-        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
-              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
-        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
-        if len(entity_args) > 2:
-            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
-        node.account.ssh(cmd)
-
-    def entity_name_opt(self, name):
-        return " --entity-default" if name is None else " --entity-name " + name
-
-class QuotaTest(Test):
-    """
-    These tests verify that quota provides expected functionality -- they run
-    producer, broker, and consumer with different clientId and quota configuration and
-    check that the observed throughput is close to the value we expect.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(QuotaTest, self).__init__(test_context=test_context)
-
-        self.topic = 'test_topic'
-        self.logger.info('use topic ' + self.topic)
-
-        self.maximum_client_deviation_percentage = 100.0
-        self.maximum_broker_deviation_percentage = 5.0
-        self.num_records = 50000
-        self.record_size = 3000
-
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol='SSL', authorizer_class_name='',
-                                  interbroker_security_protocol='SSL',
-                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
-                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
-                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
-                                  jmx_attributes=['OneMinuteRate'])
-        self.num_producers = 1
-        self.num_consumers = 2
-
-    def setUp(self):
-        self.zk.start()
-        self.kafka.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
-    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
-    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
-        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
-        producer_client_id = self.quota_config.client_id
-        consumer_client_id = self.quota_config.client_id
-
-        # Produce all messages
-        producer = ProducerPerformanceService(
-            self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
-
-        producer.run()
-
-        # Consume all messages
-        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            consumer_timeout_ms=60000, client_id=consumer_client_id,
-            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
-            jmx_attributes=['bytes-consumed-rate'])
-        consumer.run()
-
-        for idx, messages in consumer.messages_consumed.iteritems():
-            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
-
-        success, msg = self.validate(self.kafka, producer, consumer)
-        assert success, msg
-
-    def validate(self, broker, producer, consumer):
-        """
-        For each client_id we validate that:
-        1) number of consumed messages equals number of produced messages
-        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        """
-        success = True
-        msg = ''
-
-        self.kafka.read_jmx_output_all_nodes()
-
-        # validate that number of consumed messages equals number of produced messages
-        produced_num = sum([value['records'] for value in producer.results])
-        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
-        self.logger.info('producer produced %d messages' % produced_num)
-        self.logger.info('consumer consumed %d messages' % consumed_num)
-        if produced_num != consumed_num:
-            success = False
-            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
-
-        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
-        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
-        producer_quota_bps = self.quota_config.producer_quota
-        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
-        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
-        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
-        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
-                         (broker_maximum_byte_in_bps, producer_quota_bps))
-        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
-        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
-        consumer_quota_bps = self.quota_config.consumer_quota
-        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
-        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
-        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
-        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
-                         (broker_maximum_byte_out_bps, consumer_quota_bps))
-        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        return success, msg
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/__init__.py b/tests/kafkatest/tests/client1/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/client1/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 4
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+    def __init__(self, test_context):
+        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+                                                         num_zk=1, num_brokers=1, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+        })
+
+    def _verify_range_assignment(self, consumer):
+        # range assignment should give us two partition sets: (0, 1) and (2, 3)
+        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+    def _verify_roundrobin_assignment(self, consumer):
+        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+    def rolling_update_test(self):
+        """
+        Verify rolling updates of partition assignment strategies works correctly. In this
+        test, we use a rolling restart to change the group's assignment strategy from "range" 
+        to "roundrobin." We verify after every restart that all members are still in the group
+        and that the correct assignment strategy was used.
+        """
+
+        # initialize the consumer using range assignment
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+        consumer.start()
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+
+        # change consumer configuration to prefer round-robin assignment, but still support range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+        # restart one of the nodes and verify that we are still using range assignment
+        consumer.stop_node(consumer.nodes[0])
+        consumer.start_node(consumer.nodes[0])
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+        
+        # now restart the other node and verify that we have switched to round-robin
+        consumer.stop_node(consumer.nodes[1])
+        consumer.start_node(consumer.nodes[1])
+        self.await_all_members(consumer)
+        self._verify_roundrobin_assignment(consumer)
+
+        # if we want, we can now drop support for range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN
+        for node in consumer.nodes:
+            consumer.stop_node(node)
+            consumer.start_node(node)
+            self.await_all_members(consumer)
+            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/message_format_change_test.py b/tests/kafkatest/tests/client1/message_format_change_test.py
new file mode 100644
index 0000000..a57c04b
--- /dev/null
+++ b/tests/kafkatest/tests/client1/message_format_change_test.py
@@ -0,0 +1,90 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 100
+
+    def produce_and_consume(self, producer_version, consumer_version, group):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic,
+                                           throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           version=KafkaVersion(producer_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+        self.consumer.group_id = group
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+        
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+    def test_compatibility(self, producer_version, consumer_version):
+        """ This tests performs the following checks:
+        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
+        that produce to and consume from a 0.10.x cluster
+        1. initially the topic is using message format 0.9.0
+        2. change the message format version for topic to 0.10.0 on the fly.
+        3. change the message format version for topic back to 0.9.0 on the fly.
+        - The producers and consumers should not have any issue.
+        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    'configs': {"min.insync.replicas": 2}}})
+       
+        self.kafka.start()
+        self.logger.info("First format change to 0.9.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+        self.produce_and_consume(producer_version, consumer_version, "group1")
+
+        self.logger.info("Second format change to 0.10.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+        self.produce_and_consume(producer_version, consumer_version, "group2")
+
+        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+            self.logger.info("Third format change back to 0.9.0")
+            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+            self.produce_and_consume(producer_version, consumer_version, "group3")
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client1/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client1/quota_test.py b/tests/kafkatest/tests/client1/quota_test.py
new file mode 100644
index 0000000..1d31569
--- /dev/null
+++ b/tests/kafkatest/tests/client1/quota_test.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import matrix, parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.performance import ProducerPerformanceService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+class QuotaConfig(object):
+    CLIENT_ID = 'client-id'
+    USER = 'user'
+    USER_CLIENT = '(user, client-id)'
+
+    LARGE_QUOTA = 1000 * 1000 * 1000
+    USER_PRINCIPAL = 'CN=systemtest'
+
+    def __init__(self, quota_type, override_quota, kafka):
+        if quota_type == QuotaConfig.CLIENT_ID:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
+        elif quota_type == QuotaConfig.USER:
+            if override_quota:
+                self.client_id = 'some_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'some_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+        elif quota_type == QuotaConfig.USER_CLIENT:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None])
+
+    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
+        node = kafka.nodes[0]
+        cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
+              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
+        cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
+        if len(entity_args) > 2:
+            cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
+        node.account.ssh(cmd)
+
+    def entity_name_opt(self, name):
+        return " --entity-default" if name is None else " --entity-name " + name
+
+class QuotaTest(Test):
+    """
+    These tests verify that quota provides expected functionality -- they run
+    producer, broker, and consumer with different clientId and quota configuration and
+    check that the observed throughput is close to the value we expect.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(QuotaTest, self).__init__(test_context=test_context)
+
+        self.topic = 'test_topic'
+        self.logger.info('use topic ' + self.topic)
+
+        self.maximum_client_deviation_percentage = 100.0
+        self.maximum_broker_deviation_percentage = 5.0
+        self.num_records = 50000
+        self.record_size = 3000
+
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  security_protocol='SSL', authorizer_class_name='',
+                                  interbroker_security_protocol='SSL',
+                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
+                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
+                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
+                                  jmx_attributes=['OneMinuteRate'])
+        self.num_producers = 1
+        self.num_consumers = 2
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the constructor"""
+        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
+        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+        producer_client_id = self.quota_config.client_id
+        consumer_client_id = self.quota_config.client_id
+
+        # Produce all messages
+        producer = ProducerPerformanceService(
+            self.test_context, producer_num, self.kafka,
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
+            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
+
+        producer.run()
+
+        # Consume all messages
+        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
+            consumer_timeout_ms=60000, client_id=consumer_client_id,
+            jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
+            jmx_attributes=['bytes-consumed-rate'])
+        consumer.run()
+
+        for idx, messages in consumer.messages_consumed.iteritems():
+            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
+
+        success, msg = self.validate(self.kafka, producer, consumer)
+        assert success, msg
+
+    def validate(self, broker, producer, consumer):
+        """
+        For each client_id we validate that:
+        1) number of consumed messages equals number of produced messages
+        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        """
+        success = True
+        msg = ''
+
+        self.kafka.read_jmx_output_all_nodes()
+
+        # validate that number of consumed messages equals number of produced messages
+        produced_num = sum([value['records'] for value in producer.results])
+        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
+        self.logger.info('producer produced %d messages' % produced_num)
+        self.logger.info('consumer consumed %d messages' % consumed_num)
+        if produced_num != consumed_num:
+            success = False
+            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
+
+        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
+        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
+        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+        producer_quota_bps = self.quota_config.producer_quota
+        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
+        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
+        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
+        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
+                         (broker_maximum_byte_in_bps, producer_quota_bps))
+        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
+        consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id
+        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
+        consumer_quota_bps = self.quota_config.consumer_quota
+        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
+        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
+
+        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
+        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
+        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
+        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
+                         (broker_maximum_byte_out_bps, consumer_quota_bps))
+        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
+            success = False
+            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
+                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
+
+        return success, msg
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/__init__.py b/tests/kafkatest/tests/client2/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/tests/client2/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e035fc03/tests/kafkatest/tests/client2/compression_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client2/compression_test.py b/tests/kafkatest/tests/client2/compression_test.py
new file mode 100644
index 0000000..0de53ae
--- /dev/null
+++ b/tests/kafkatest/tests/client2/compression_test.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+
+class CompressionTest(ProduceConsumeValidateTest):
+    """
+    These tests validate produce / consume for compressed topics.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(CompressionTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 10,
+                                                                    "replication-factor": 1}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 4
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
+
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=True)
+    @parametrize(compression_types=["snappy","gzip","lz4","none"], new_consumer=False)
+    def test_compressed_topic(self, compression_types, new_consumer):
+        """Test produce => consume => validate for compressed topics
+        Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1
+
+        compression_types parameter gives a list of compression types (or no compression if
+        "none"). Each producer in a VerifiableProducer group (num_producers = 4) will use a
+        compression type from the list based on producer's index in the group.
+
+            - Produce messages in the background
+            - Consume messages in the background
+            - Stop producing, and finish consuming
+            - Validate that every acked message was consumed
+        """
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix,
+                                           compression_types=compression_types)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+                                        new_consumer=new_consumer, consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+


Mime
View raw message