From commits-return-9275-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue Mar 27 03:32:29 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2EB81186F1 for ; Tue, 27 Mar 2018 03:32:29 +0000 (UTC) Received: (qmail 13028 invoked by uid 500); 27 Mar 2018 03:32:29 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 12990 invoked by uid 500); 27 Mar 2018 03:32:28 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 12976 invoked by uid 99); 27 Mar 2018 03:32:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Mar 2018 03:32:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 062CE8087C; Tue, 27 Mar 2018 03:32:27 +0000 (UTC) Date: Tue, 27 Mar 2018 03:32:26 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 0.11.0 updated: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4761) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152212154646.23107.14111293656712773546@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/0.11.0 X-Git-Reftype: branch X-Git-Oldrev: 4d398e755f3a06497ccd8d6cd101f5f987b21b35 X-Git-Newrev: 13dbcad9bb2096195d45d30b59513f4db6a92b1a X-Git-Rev: 13dbcad9bb2096195d45d30b59513f4db6a92b1a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.11.0 by this push: new 13dbcad KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4761) 13dbcad is described below commit 13dbcad9bb2096195d45d30b59513f4db6a92b1a Author: Matthias J. Sax AuthorDate: Mon Mar 26 20:32:20 2018 -0700 KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4761) Introduces new config parameter `upgrade.from`. Reviewers: Guozhang Wang , Bill Bejeck --- bin/kafka-run-class.sh | 40 +++- build.gradle | 36 +++ .../authenticator/SaslClientCallbackHandler.java | 9 +- docs/streams/upgrade-guide.html | 27 ++- docs/upgrade.html | 54 ++++- gradle/dependencies.gradle | 6 + settings.gradle | 3 +- .../org/apache/kafka/streams/StreamsConfig.java | 17 ++ .../internals/StreamPartitionAssignor.java | 17 +- .../internals/assignment/AssignmentInfo.java | 6 +- .../internals/assignment/SubscriptionInfo.java | 4 +- .../apache/kafka/streams/StreamsConfigTest.java | 47 ++-- .../KStreamAggregationDedupIntegrationTest.java | 4 +- .../internals/StreamPartitionAssignorTest.java | 154 +++++++++---- .../internals/assignment/AssignmentInfoTest.java | 3 +- .../kafka/streams/tests/SmokeTestDriver.java | 38 ++-- .../apache/kafka/streams/tests/SmokeTestUtil.java | 9 +- .../kafka/streams/tests/StreamsSmokeTest.java | 14 +- .../kafka/streams/tests/StreamsUpgradeTest.java | 73 ++++++ .../kafka/streams/tests/StreamsUpgradeTest.java | 104 +++++++++ .../kafka/streams/tests/StreamsUpgradeTest.java | 114 ++++++++++ .../kafka/streams/tests/StreamsUpgradeTest.java | 108 +++++++++ tests/kafkatest/services/streams.py | 173 ++++++++++++++- .../tests/streams/streams_upgrade_test.py | 246 +++++++++++++++++++++ tests/kafkatest/version.py | 5 +- vagrant/base.sh | 2 + 26 files changed, 1183 insertions(+), 130 deletions(-) diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index fe6aefd..8e2ba91 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then - CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do + if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" + fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do + if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" + fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index ce4b4e4..17f3e00 100644 --- a/build.gradle +++ b/build.gradle @@ -909,6 +909,42 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { + testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { + dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { + testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { + dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0102') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0102" + + dependencies { + testCompile libs.kafkaStreams_0102 + } + + systemTestLibs { + dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 7111bad..7102414 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; @@ -25,10 +27,7 @@ import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.auth.AuthCallbackHandler; +import java.util.Map; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 7f2c9f6..86d6d53 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -27,16 +27,33 @@

- If you want to upgrade from 0.10.1.x to 0.10.2, see the Upgrade Section for 0.10.2. + If you want to upgrade from 0.10.1.x to 0.11.0, see the Upgrade Section for 0.10.2. It highlights incompatible changes you need to consider to upgrade your code and application. - See below a complete list of 0.10.2 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features. + See below a complete list of 0.10.2 and 0.11.0 API and semantical changes + that allow you to advance your application and/or simplify your code base, including the usage of new features.

- If you want to upgrade from 0.10.0.x to 0.10.1, see the Upgrade Section for 0.10.1. - It highlights incompatible changes you need to consider to upgrade your code and application. - See below a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features. + Upgrading from 0.10.0.x to 0.11.0.x directly is also possible. + Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher. + See Streams API changes in 0.10.1, Streams API changes in 0.10.2, + and Streams API changes in 0.11.0 for a complete list of API changes. + Upgrading to 0.11.0.3 requires two rolling bounces with config upgrade.from="0.10.0" set for first upgrade phase + (cf. KIP-268). + As an alternative, an offline upgrade is also possible.

+
    +
  • prepare your application instances for a rolling bounce and make sure that config upgrade.from is set to "0.10.0" for new version 0.11.0.3
  • +
  • bounce each instance of your application once
  • +
  • prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config upgrade.mode
  • +
  • bounce each instance of your application once more to complete the upgrade
  • +
+

Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)

+
    +
  • stop all old (0.10.0.x) application instances
  • +
  • update your code and swap old code and jar file with new code and new jar file
  • +
  • restart all new (0.11.0.0, 0.11.0.1, or 0.11.0.2) application instances
  • +

Streams API changes in 0.11.0.0

diff --git a/docs/upgrade.html b/docs/upgrade.html index 9f0dbdf..0603875 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -64,6 +64,12 @@ before you switch to 0.11.0. +
Notable changes in 0.11.0.3
+
    +
  • New Kafka Streams configuration parameter upgrade.from added that allows rolling bounce upgrade from version 0.10.0.x
  • +
  • See the Kafka Streams upgrade guide for details about this new config. +
+
Notable changes in 0.11.0.0
  • Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to @@ -214,14 +220,41 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
  • See Streams API changes in 0.10.2 for more details.
+
Upgrading a 0.10.0 Kafka Streams Application
+
    +
  • Upgrading your Streams application from 0.10.0 to 0.10.2 does require a broker upgrade because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers.
  • +
  • There are couple of API changes, that are not backward compatible (cf. Streams API changes in 0.10.2 for more details). + Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
  • +
  • Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config upgrade.from="0.10.0" set for first upgrade phase + (cf. KIP-268). + As an alternative, an offline upgrade is also possible. +
      +
    • prepare your application instances for a rolling bounce and make sure that config upgrade.from is set to "0.10.0" for new version 0.10.2.2
    • +
    • bounce each instance of your application once
    • +
    • prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config upgrade.mode
    • +
    • bounce each instance of your application once more to complete the upgrade
    • +
    +
  • +
  • Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported) +
      +
    • stop all old (0.10.0.x) application instances
    • +
    • update your code and swap old code and jar file with new code and new jar file
    • +
    • restart all new (0.10.2.0 or 0.10.2.1) application instances
    • +
    +
  • +
+ +
Notable changes in 0.10.2.2
+
    +
  • New configuration parameter upgrade.from added that allows rolling bounce upgrade from version 0.10.0.x
  • +
+
Notable changes in 0.10.2.1
  • The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer retries default value was changed from 0 to 10. The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE.
- -
Notable changes in 0.10.2.0
  • The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients @@ -294,6 +327,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
  • Upgrading your Streams application from 0.10.0 to 0.10.1 does require a broker upgrade because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers.
  • There are couple of API changes, that are not backward compatible (cf. Streams API changes in 0.10.1 for more details). Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
  • +
  • Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config upgrade.from="0.10.0" set for first upgrade phase + (cf. KIP-268). + As an alternative, an offline upgrade is also possible. +
      +
    • prepare your application instances for a rolling bounce and make sure that config upgrade.from is set to "0.10.0" for new version 0.10.1.2
    • +
    • bounce each instance of your application once
    • +
    • prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config upgrade.mode
    • +
    • bounce each instance of your application once more to complete the upgrade
    • +
    +
  • +
  • Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported) +
      +
    • stop all old (0.10.0.x) application instances
    • +
    • update your code and swap old code and jar file with new code and new jar file
    • +
    • restart all new (0.10.1.0 or 0.10.1.1) application instances
    • +
    +
Notable changes in 0.10.1.0
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 5d145e1..d881353 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -55,6 +55,9 @@ versions += [ jackson: "2.8.5", jetty: "9.2.22.v20170606", jersey: "2.24", + kafka_0100: "0.10.0.1", + kafka_0101: "0.10.1.1", + kafka_0102: "0.10.2.1", log4j: "1.2.17", jopt: "5.0.3", junit: "4.12", @@ -96,6 +99,9 @@ libs += [ junit: "junit:junit:$versions.junit", log4j: "log4j:log4j:$versions.log4j", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", + kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100", + kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101", + kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", lz4: "net.jpountz.lz4:lz4:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", powermock: "org.powermock:powermock-module-junit4:$versions.powermock", diff --git a/settings.gradle b/settings.gradle index f0fdf07..769046f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,5 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender', +include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100', + 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks' diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b411344..d45b135 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -106,6 +106,11 @@ public class StreamsConfig extends AbstractConfig { public static final String PRODUCER_PREFIX = "producer."; /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}. + */ + public static final String UPGRADE_FROM_0100 = "0.10.0"; + + /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. */ public static final String AT_LEAST_ONCE = "at_least_once"; @@ -247,6 +252,11 @@ public class StreamsConfig extends AbstractConfig { public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the TimestampExtractor interface. This config is deprecated, use " + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + " instead"; + /** {@code upgrade.from} */ + public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; + public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " + + "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x)."; + /** * {@code value.serde} * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead. @@ -466,6 +476,12 @@ public class StreamsConfig extends AbstractConfig { null, Importance.LOW, TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(UPGRADE_FROM_CONFIG, + ConfigDef.Type.STRING, + null, + in(null, UPGRADE_FROM_0100), + Importance.LOW, + UPGRADE_FROM_DOC) .define(VALUE_SERDE_CLASS_CONFIG, Type.CLASS, null, @@ -632,6 +648,7 @@ public class StreamsConfig extends AbstractConfig { consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); // add configs required for stream partition assignor + consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG)); consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread); consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 0a1b2ab..6e2bfa6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -165,6 +165,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private String userEndPoint; private int numStandbyReplicas; + private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION; + private Cluster metadataWithInternalTopics; private Map> partitionsByHostState; @@ -192,6 +194,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable public void configure(Map configs) { numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); + final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); + if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) { + log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x."); + userMetadataVersion = 1; + } + Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE); if (o == null) { KafkaException ex = new KafkaException("StreamThread is not specified"); @@ -251,7 +259,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable final Set previousActiveTasks = streamThread.prevActiveTasks(); Set standbyTasks = streamThread.cachedTasks(); standbyTasks.removeAll(previousActiveTasks); - SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint); + SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint); if (streamThread.builder.sourceTopicPattern() != null && !streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) { @@ -295,11 +303,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // construct the client metadata from the decoded subscription info Map clientsMetadata = new HashMap<>(); + int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION; for (Map.Entry entry : subscriptions.entrySet()) { String consumerId = entry.getKey(); Subscription subscription = entry.getValue(); SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + final int usedVersion = info.version; + if (usedVersion < minUserMetadataVersion) { + minUserMetadataVersion = usedVersion; + } // create the new client metadata if necessary ClientMetadata clientMetadata = clientsMetadata.get(info.processId); @@ -556,7 +569,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } // finally, encode the assignment before sending back to coordinator - assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode())); + assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode())); i++; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 77fb58a..5409976 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -55,7 +55,7 @@ public class AssignmentInfo { this(CURRENT_VERSION, activeTasks, standbyTasks, hostState); } - protected AssignmentInfo(int version, List activeTasks, Map> standbyTasks, + public AssignmentInfo(int version, List activeTasks, Map> standbyTasks, Map> hostState) { this.version = version; this.activeTasks = activeTasks; @@ -154,9 +154,7 @@ public class AssignmentInfo { } } - return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions); - - + return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions); } catch (IOException ex) { throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index f583dba..00227e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -31,7 +31,7 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - private static final int CURRENT_VERSION = 2; + public static final int CURRENT_VERSION = 2; public final int version; public final UUID processId; @@ -43,7 +43,7 @@ public class SubscriptionInfo { this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint); } - private SubscriptionInfo(int version, UUID processId, Set prevTasks, Set standbyTasks, String userEndPoint) { + public SubscriptionInfo(int version, UUID processId, Set prevTasks, Set standbyTasks, String userEndPoint) { this.version = version; this.processId = processId; this.prevTasks = prevTasks; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 3bbd69e..9998283 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -82,7 +82,7 @@ public class StreamsConfigTest { } @Test - public void testGetProducerConfigs() throws Exception { + public void testGetProducerConfigs() { final String clientId = "client"; final Map returnedProps = streamsConfig.getProducerConfigs(clientId); assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer"); @@ -91,7 +91,7 @@ public class StreamsConfigTest { } @Test - public void testGetConsumerConfigs() throws Exception { + public void testGetConsumerConfigs() { final String groupId = "example-application"; final String clientId = "client"; final Map returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId); @@ -102,7 +102,7 @@ public class StreamsConfigTest { } @Test - public void testGetRestoreConsumerConfigs() throws Exception { + public void testGetRestoreConsumerConfigs() { final String clientId = "client"; final Map returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer"); @@ -143,7 +143,7 @@ public class StreamsConfigTest { } @Test - public void shouldSupportPrefixedConsumerConfigs() throws Exception { + public void shouldSupportPrefixedConsumerConfigs() { props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -153,7 +153,7 @@ public class StreamsConfigTest { } @Test - public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception { + public void shouldSupportPrefixedRestoreConsumerConfigs() { props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -163,7 +163,7 @@ public class StreamsConfigTest { } @Test - public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception { + public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); final Map consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); @@ -171,7 +171,7 @@ public class StreamsConfigTest { } @Test - public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception { + public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); @@ -179,7 +179,7 @@ public class StreamsConfigTest { } @Test - public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception { + public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(producerPrefix("interceptor.statsd.host"), "host"); final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); @@ -188,7 +188,7 @@ public class StreamsConfigTest { @Test - public void shouldSupportPrefixedProducerConfigs() throws Exception { + public void shouldSupportPrefixedProducerConfigs() { props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10); props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -198,7 +198,7 @@ public class StreamsConfigTest { } @Test - public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception { + public void shouldBeSupportNonPrefixedConsumerConfigs() { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -208,7 +208,7 @@ public class StreamsConfigTest { } @Test - public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception { + public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -218,7 +218,7 @@ public class StreamsConfigTest { } @Test - public void shouldSupportNonPrefixedProducerConfigs() throws Exception { + public void shouldSupportNonPrefixedProducerConfigs() { props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -227,24 +227,22 @@ public class StreamsConfigTest { assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } - - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception { + public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() { props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.defaultKeySerde(); } @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception { + public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() { props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.defaultValueSerde(); } @Test - public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception { + public void shouldOverrideStreamsDefaultConsumerConfigs() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -254,7 +252,7 @@ public class StreamsConfigTest { } @Test - public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception { + public void shouldOverrideStreamsDefaultProducerConfigs() { props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000"); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); @@ -262,7 +260,7 @@ public class StreamsConfigTest { } @Test - public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception { + public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -272,21 +270,21 @@ public class StreamsConfigTest { } @Test(expected = ConfigException.class) - public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception { + public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.getConsumerConfigs(null, "a", "b"); } @Test(expected = ConfigException.class) - public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception { + public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.getRestoreConsumerConfigs("client"); } @Test - public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception { + public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { final StreamsConfig streamsConfig = new StreamsConfig(props); final Map consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false)); @@ -395,6 +393,7 @@ public class StreamsConfigTest { assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs)); } + @SuppressWarnings("deprecation") @Test public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() { final Properties props = minimalStreamsConfig(); @@ -429,6 +428,7 @@ public class StreamsConfigTest { assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp); } + @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() { final Properties props = minimalStreamsConfig(); @@ -442,6 +442,7 @@ public class StreamsConfigTest { } } + @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectKeySerdeClassOnError() { final Properties props = minimalStreamsConfig(); @@ -455,6 +456,7 @@ public class StreamsConfigTest { } } + @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() { final Properties props = minimalStreamsConfig(); @@ -468,6 +470,7 @@ public class StreamsConfigTest { } } + @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectValueSerdeClassOnError() { final Properties props = minimalStreamsConfig(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 372b89c..bb4b575 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -51,7 +52,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; -import kafka.utils.MockTime; import org.junit.experimental.categories.Category; import static org.hamcrest.MatcherAssert.assertThat; @@ -315,6 +315,4 @@ public class KStreamAggregationDedupIntegrationTest { } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 98cd20a..a29380f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -135,7 +135,7 @@ public class StreamPartitionAssignorTest { @SuppressWarnings("unchecked") @Test - public void testSubscription() throws Exception { + public void testSubscription() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -187,7 +187,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignBasic() throws Exception { + public void testAssignBasic() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -239,12 +239,10 @@ public class StreamPartitionAssignorTest { assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); // check assignment info - - Set allActiveTasks = new HashSet<>(); + AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); // the first consumer - AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + Set allActiveTasks = new HashSet<>(info10.activeTasks); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); @@ -264,7 +262,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignWithPartialTopology() throws Exception { + public void testAssignWithPartialTopology() { Properties props = configProps(); props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class); StreamsConfig config = new StreamsConfig(props); @@ -306,9 +304,8 @@ public class StreamPartitionAssignorTest { Map assignments = partitionAssignor.assign(metadata, subscriptions); // check assignment info - Set allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + Set allActiveTasks = new HashSet<>(info10.activeTasks); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -316,7 +313,7 @@ public class StreamPartitionAssignorTest { @Test - public void testAssignEmptyMetadata() throws Exception { + public void testAssignEmptyMetadata() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -359,9 +356,8 @@ public class StreamPartitionAssignorTest { new HashSet<>(assignments.get("consumer10").partitions())); // check assignment info - Set allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Collections.emptySet(), assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + Set allActiveTasks = new HashSet<>(info10.activeTasks); assertEquals(0, allActiveTasks.size()); assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks)); @@ -384,7 +380,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignWithNewTasks() throws Exception { + public void testAssignWithNewTasks() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); @@ -430,13 +426,9 @@ public class StreamPartitionAssignorTest { // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and // then later ones will be re-assigned to other hosts due to load balancing - Set allActiveTasks = new HashSet<>(); - Set allPartitions = new HashSet<>(); - AssignmentInfo info; - - info = AssignmentInfo.decode(assignments.get("consumer10").userData()); - allActiveTasks.addAll(info.activeTasks); - allPartitions.addAll(assignments.get("consumer10").partitions()); + AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData()); + Set allActiveTasks = new HashSet<>(info.activeTasks); + Set allPartitions = new HashSet<>(assignments.get("consumer10").partitions()); info = AssignmentInfo.decode(assignments.get("consumer11").userData()); allActiveTasks.addAll(info.activeTasks); @@ -451,7 +443,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignWithStates() throws Exception { + public void testAssignWithStates() { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addSource("source1", "topic1"); @@ -551,7 +543,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignWithStandbyReplicas() throws Exception { + public void testAssignWithStandbyReplicas() { Properties props = configProps(); props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); StreamsConfig config = new StreamsConfig(props); @@ -600,13 +592,10 @@ public class StreamPartitionAssignorTest { Map assignments = partitionAssignor.assign(metadata, subscriptions); - Set allActiveTasks = new HashSet<>(); - Set allStandbyTasks = new HashSet<>(); - // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); - allStandbyTasks.addAll(info10.standbyTasks.keySet()); + Set allActiveTasks = new HashSet<>(info10.activeTasks); + Set allStandbyTasks = new HashSet<>(info10.standbyTasks.keySet()); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); @@ -634,7 +623,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testOnAssignment() throws Exception { + public void testOnAssignment() { TopicPartition t2p3 = new TopicPartition("topic2", 3); TopologyBuilder builder = new TopologyBuilder(); @@ -677,7 +666,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignWithInternalTopics() throws Exception { + public void testAssignWithInternalTopics() { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); @@ -722,7 +711,7 @@ public class StreamPartitionAssignorTest { } @Test - public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception { + public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); @@ -760,7 +749,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldAddUserDefinedEndPointToSubscription() throws Exception { + public void shouldAddUserDefinedEndPointToSubscription() { final Properties properties = configProps(); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080"); final StreamsConfig config = new StreamsConfig(properties); @@ -773,8 +762,8 @@ public class StreamPartitionAssignorTest { final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0, stateDirectory); + final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, + uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); @@ -783,7 +772,82 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldMapUserEndPointToTopicPartitions() throws Exception { + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() { + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "consumer2", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + StreamsConfig config = new StreamsConfig(configProps()); + + final TopologyBuilder builder = new TopologyBuilder(); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + "appId", + "clientId", + UUID.randomUUID(), + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); + + partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId")); + final Map assignment = partitionAssignor.assign(metadata, subscriptions); + + assertEquals(2, assignment.size()); + assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version); + assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version); + } + + @Test + public void shouldDownGradeSubscription() { + final Properties properties = configProps(); + properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100); + StreamsConfig config = new StreamsConfig(properties); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + + String clientId = "client-id"; + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + "appId", + "clientId", + UUID.randomUUID(), + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", clientId)); + + PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + + assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version); + } + + @Test + public void shouldMapUserEndPointToTopicPartitions() { final Properties properties = configProps(); final String myEndPoint = "localhost:8080"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); @@ -831,7 +895,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception { + public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { final Properties properties = configProps(); final String myEndPoint = "localhost"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); @@ -865,7 +929,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception { + public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { final Properties properties = configProps(); final String myEndPoint = "localhost:j87yhk"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); @@ -897,7 +961,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception { + public void shouldExposeHostStateToTopicPartitionsOnAssignment() { List topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map> hostState = Collections.singletonMap(new HostInfo("localhost", 80), @@ -910,7 +974,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldSetClusterMetadataOnAssignment() throws Exception { + public void shouldSetClusterMetadataOnAssignment() { final List topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map> hostState = Collections.singletonMap(new HostInfo("localhost", 80), @@ -930,7 +994,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception { + public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() { final Cluster cluster = partitionAssignor.clusterMetadata(); assertNotNull(cluster); } @@ -1039,11 +1103,11 @@ public class StreamPartitionAssignorTest { new TopicPartition(applicationId + "-count-repartition", 1), new TopicPartition(applicationId + "-count-repartition", 2) ); - assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment))); + assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment))); } @Test - public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception { + public void shouldUpdatePartitionHostInfoMapOnAssignment() { final TopicPartition partitionOne = new TopicPartition("topic", 1); final TopicPartition partitionTwo = new TopicPartition("topic", 2); final Map> firstHostState = Collections.singletonMap( @@ -1060,7 +1124,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldUpdateClusterMetadataOnAssignment() throws Exception { + public void shouldUpdateClusterMetadataOnAssignment() { final TopicPartition topicOne = new TopicPartition("topic", 1); final TopicPartition topicTwo = new TopicPartition("topic2", 2); final Map> firstHostState = Collections.singletonMap( @@ -1076,7 +1140,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception { + public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { final Properties props = configProps(); props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); final StreamsConfig config = new StreamsConfig(props); @@ -1135,12 +1199,12 @@ public class StreamPartitionAssignorTest { } @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception { + public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() { partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); } @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception { + public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() { final Map config = new HashMap<>(); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index 9473a40..361dde8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -64,10 +64,9 @@ public class AssignmentInfoTest { assertEquals(oldVersion.activeTasks, decoded.activeTasks); assertEquals(oldVersion.standbyTasks, decoded.standbyTasks); assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1 - assertEquals(2, decoded.version); // automatically upgraded to v2 on decode; + assertEquals(1, decoded.version); } - /** * This is a clone of what the V1 encoding did. The encode method has changed for V2 * so it is impossible to test compatibility without having this diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 11e1ae8..3030615 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil { // This main() is not used by the system test. It is intended to be used for local debugging. public static void main(String[] args) throws Exception { final String kafka = "localhost:9092"; - final String zookeeper = "localhost:2181"; final File stateDir = TestUtils.tempDirectory(); final int numKeys = 20; @@ -131,42 +130,50 @@ public class SmokeTestDriver extends SmokeTestUtil { } public static Map> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception { + return generate(kafka, numKeys, maxRecordsPerKey, true); + } + public static Map> generate(final String kafka, + final int numKeys, + final int maxRecordsPerKey, + final boolean autoTerminate) throws Exception { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - // the next 4 config values make sure that all records are produced with no loss and - // no duplicates + // the next 2 config values make sure that all records are produced with no loss and no duplicates producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - KafkaProducer producer = new KafkaProducer<>(producerProps); + final KafkaProducer producer = new KafkaProducer<>(producerProps); int numRecordsProduced = 0; - Map> allData = new HashMap<>(); - ValueList[] data = new ValueList[numKeys]; + final Map> allData = new HashMap<>(); + final ValueList[] data = new ValueList[numKeys]; for (int i = 0; i < numKeys; i++) { data[i] = new ValueList(i, i + maxRecordsPerKey - 1); allData.put(data[i].key, new HashSet()); } - Random rand = new Random(); + final Random rand = new Random(); - int remaining = data.length; + int remaining = 1; // dummy value must be positive if is false + if (autoTerminate) { + remaining = data.length; + } while (remaining > 0) { - int index = rand.nextInt(remaining); - String key = data[index].key; + final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys); + final String key = data[index].key; int value = data[index].next(); - if (value < 0) { + if (autoTerminate && value < 0) { remaining--; data[index] = data[remaining]; } else { - ProducerRecord record = - new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); + final ProducerRecord record = + new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); producer.send(record, new Callback() { @Override @@ -178,11 +185,12 @@ public class SmokeTestDriver extends SmokeTestUtil { } }); - numRecordsProduced++; allData.get(key).add(value); - if (numRecordsProduced % 100 == 0) + + if (numRecordsProduced % 100 == 0) { System.out.println(numRecordsProduced + " records produced"); + } Utils.sleep(2); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 150ec7d..11845b4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -44,20 +44,15 @@ public class SmokeTestUtil { public Processor get() { return new AbstractProcessor() { private int numRecordsProcessed = 0; - private ProcessorContext context; @Override public void init(final ProcessorContext context) { System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; - this.context = context; } @Override public void process(final Object key, final Object value) { - if (printOffset) { - System.out.println(">>> " + context.offset()); - } numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); @@ -65,10 +60,10 @@ public class SmokeTestUtil { } @Override - public void punctuate(final long timestamp) { } + public void punctuate(final long timestamp) {} @Override - public void close() { } + public void close() {} }; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index 244aa8e..699aaeb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -23,7 +23,7 @@ import java.util.Set; public class StreamsSmokeTest { /** - * args ::= command kafka zookeeper stateDir + * args ::= command kafka zookeeper stateDir disableAutoTerminate * command := "run" | "process" * * @param args @@ -32,11 +32,13 @@ public class StreamsSmokeTest { String kafka = args[0]; String stateDir = args.length > 1 ? args[1] : null; String command = args.length > 2 ? args[2] : null; + boolean disableAutoTerminate = args.length > 3; - System.out.println("StreamsTest instance started"); + System.out.println("StreamsTest instance started (StreamsSmokeTest)"); System.out.println("command=" + command); System.out.println("kafka=" + kafka); System.out.println("stateDir=" + stateDir); + System.out.println("disableAutoTerminate=" + disableAutoTerminate); switch (command) { case "standalone": @@ -46,8 +48,12 @@ public class StreamsSmokeTest { // this starts the driver (data generation and result verification) final int numKeys = 10; final int maxRecordsPerKey = 500; - Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); - SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + if (disableAutoTerminate) { + SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false); + } else { + Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); + SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + } break; case "process": // this starts a KafkaStreams client diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000..0ee47e4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires two argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] : "")); + } + final String kafka = args[0]; + final String stateDir = args[1]; + final String upgradeFrom = args.length > 2 ? args[2] : null; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)"); + System.out.println("kafka=" + kafka); + System.out.println("stateDir=" + stateDir); + System.out.println("upgradeFrom=" + upgradeFrom); + + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream dataStream = builder.stream("data"); + dataStream.process(SmokeTestUtil.printProcessorSupplier("data")); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + if (upgradeFrom != null) { + config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom); + } + + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } +} diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000..72d7f5a --- /dev/null +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 3) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "") + + (args.length > 1 ? args[1] : "")); + } + final String kafka = args[0]; + final String zookeeper = args[1]; + final String stateDir = args[2]; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)"); + System.out.println("kafka=" + kafka); + System.out.println("zookeeper=" + zookeeper); + System.out.println("stateDir=" + stateDir); + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static ProcessorSupplier printProcessorSupplier() { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000..eebd0fa --- /dev/null +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + /** + * This test cannot be run executed, as long as Kafka 0.10.1.2 is not released + */ + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 3) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "") + + (args.length > 1 ? args[1] : "")); + } + final String kafka = args[0]; + final String zookeeper = args[1]; + final String stateDir = args[2]; + final String upgradeFrom = args.length > 3 ? args[3] : null; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)"); + System.out.println("kafka=" + kafka); + System.out.println("zookeeper=" + zookeeper); + System.out.println("stateDir=" + stateDir); + System.out.println("upgradeFrom=" + upgradeFrom); + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + if (upgradeFrom != null) { + // TODO: because Kafka 0.10.1.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet + //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom); + config.setProperty("upgrade.from", upgradeFrom); + } + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static ProcessorSupplier printProcessorSupplier() { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000..18240f0 --- /dev/null +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + /** + * This test cannot be run executed, as long as Kafka 0.10.2.2 is not released + */ + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] : "")); + } + final String kafka = args[0]; + final String stateDir = args[1]; + final String upgradeFrom = args.length > 2 ? args[2] : null; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)"); + System.out.println("kafka=" + kafka); + System.out.println("stateDir=" + stateDir); + System.out.println("upgradeFrom=" + upgradeFrom); + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + if (upgradeFrom != null) { + // TODO: because Kafka 0.10.2.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet + //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom); + config.setProperty("upgrade.from", upgradeFrom); + } + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static ProcessorSupplier printProcessorSupplier() { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index e6f692b..eeb1681 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -20,6 +20,7 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1 class StreamsTestBaseService(KafkaPathResolverMixin, Service): @@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid") + CLEAN_NODE_ENABLED = True + logs = { "streams_log": { "path": LOG_FILE, @@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): "streams_stderr": { "path": STDERR_FILE, "collect_default": True}, + "streams_log.0-1": { + "path": LOG_FILE + ".0-1", + "collect_default": True}, + "streams_stdout.0-1": { + "path": STDOUT_FILE + ".0-1", + "collect_default": True}, + "streams_stderr.0-1": { + "path": STDERR_FILE + ".0-1", + "collect_default": True}, + "streams_log.0-2": { + "path": LOG_FILE + ".0-2", + "collect_default": True}, + "streams_stdout.0-2": { + "path": STDOUT_FILE + ".0-2", + "collect_default": True}, + "streams_stderr.0-2": { + "path": STDERR_FILE + ".0-2", + "collect_default": True}, + "streams_log.0-3": { + "path": LOG_FILE + ".0-3", + "collect_default": True}, + "streams_stdout.0-3": { + "path": STDOUT_FILE + ".0-3", + "collect_default": True}, + "streams_stderr.0-3": { + "path": STDERR_FILE + ".0-3", + "collect_default": True}, + "streams_log.0-4": { + "path": LOG_FILE + ".0-4", + "collect_default": True}, + "streams_stdout.0-4": { + "path": STDOUT_FILE + ".0-4", + "collect_default": True}, + "streams_stderr.0-4": { + "path": STDERR_FILE + ".0-4", + "collect_default": True}, + "streams_log.0-5": { + "path": LOG_FILE + ".0-5", + "collect_default": True}, + "streams_stdout.0-5": { + "path": STDOUT_FILE + ".0-5", + "collect_default": True}, + "streams_stderr.0-5": { + "path": STDERR_FILE + ".0-5", + "collect_default": True}, + "streams_log.0-6": { + "path": LOG_FILE + ".0-6", + "collect_default": True}, + "streams_stdout.0-6": { + "path": STDOUT_FILE + ".0-6", + "collect_default": True}, + "streams_stderr.0-6": { + "path": STDERR_FILE + ".0-6", + "collect_default": True}, + "streams_log.1-1": { + "path": LOG_FILE + ".1-1", + "collect_default": True}, + "streams_stdout.1-1": { + "path": STDOUT_FILE + ".1-1", + "collect_default": True}, + "streams_stderr.1-1": { + "path": STDERR_FILE + ".1-1", + "collect_default": True}, + "streams_log.1-2": { + "path": LOG_FILE + ".1-2", + "collect_default": True}, + "streams_stdout.1-2": { + "path": STDOUT_FILE + ".1-2", + "collect_default": True}, + "streams_stderr.1-2": { + "path": STDERR_FILE + ".1-2", + "collect_default": True}, + "streams_log.1-3": { + "path": LOG_FILE + ".1-3", + "collect_default": True}, + "streams_stdout.1-3": { + "path": STDOUT_FILE + ".1-3", + "collect_default": True}, + "streams_stderr.1-3": { + "path": STDERR_FILE + ".1-3", + "collect_default": True}, + "streams_log.1-4": { + "path": LOG_FILE + ".1-4", + "collect_default": True}, + "streams_stdout.1-4": { + "path": STDOUT_FILE + ".1-4", + "collect_default": True}, + "streams_stderr.1-4": { + "path": STDERR_FILE + ".1-4", + "collect_default": True}, + "streams_log.1-5": { + "path": LOG_FILE + ".1-5", + "collect_default": True}, + "streams_stdout.1-5": { + "path": STDOUT_FILE + ".1-5", + "collect_default": True}, + "streams_stderr.1-5": { + "path": STDERR_FILE + ".1-5", + "collect_default": True}, + "streams_log.1-6": { + "path": LOG_FILE + ".1-6", + "collect_default": True}, + "streams_stdout.1-6": { + "path": STDOUT_FILE + ".1-6", + "collect_default": True}, + "streams_stderr.1-6": { + "path": STDERR_FILE + ".1-6", + "collect_default": True}, } def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None): @@ -107,7 +218,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): def clean_node(self, node): node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + if self.CLEAN_NODE_ENABLED: + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) def start_cmd(self, node): args = self.args.copy() @@ -163,7 +275,28 @@ class StreamsEosTestBaseService(StreamsTestBaseService): class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka): super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run") + self.DISABLE_AUTO_TERMINATE = "" + + def disable_auto_terminate(self): + self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate" + + def start_cmd(self, node): + args = self.args.copy() + args['kafka'] = self.kafka.bootstrap_servers() + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(state_dir)s %(user_test_args)s %(disable_auto_terminate)s" \ + " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka): @@ -206,3 +339,41 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService): kafka, "org.apache.kafka.streams.tests.BrokerCompatibilityTest", eosEnabled) + +class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): + def __init__(self, test_context, kafka): + super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StreamsUpgradeTest", + "") + self.UPGRADE_FROM = "" + + def set_version(self, kafka_streams_version): + self.KAFKA_STREAMS_VERSION = kafka_streams_version + + def set_upgrade_from(self, upgrade_from): + self.UPGRADE_FROM = upgrade_from + + def start_cmd(self, node): + args = self.args.copy() + args['kafka'] = self.kafka.bootstrap_servers() + if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1): + args['zk'] = self.kafka.zk.connect_setting() + else: + args['zk'] = "" + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['version'] = self.KAFKA_STREAMS_VERSION + args['upgrade_from'] = self.UPGRADE_FROM + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \ + " %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s %(upgrade_from)s" \ + " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py new file mode 100644 index 0000000..7aa2de6 --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_VERSION +import random + +class StreamsUpgradeTest(KafkaTest): + """ + Test upgrading Kafka Streams (all version combination) + If metadata was changes, upgrade is more difficult + Metadata version was bumped in 0.10.1.0 + """ + + def __init__(self, test_context): + super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + 'echo' : { 'partitions': 5 }, + 'data' : { 'partitions': 5 } + }) + + self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) + self.driver.disable_auto_terminate() + self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka) + self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka) + self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka) + + @parametrize(old_version=str(LATEST_0_10_1), new_version=str(LATEST_0_10_2)) + @parametrize(old_version=str(LATEST_0_10_1), new_version=str(DEV_VERSION)) + @parametrize(old_version=str(LATEST_0_10_2), new_version=str(DEV_VERSION)) + def test_simple_upgrade(self, old_version, new_version): + """ + Starts 3 KafkaStreams instances with , and upgrades one-by-one to + """ + + self.driver.start() + self.start_all_nodes_with(old_version) + + self.processors = [self.processor1, self.processor2, self.processor3] + + counter = 1 + random.seed() + + random.shuffle(self.processors) + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + self.do_rolling_bounce(p, "", new_version, counter) + counter = counter + 1 + + # shutdown + self.driver.stop() + self.driver.wait() + + random.shuffle(self.processors) + for p in self.processors: + node = p.node + with node.account.monitor_log(p.STDOUT_FILE) as monitor: + p.stop() + monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", + timeout_sec=60, + err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + + self.driver.stop() + + #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released + #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released + @parametrize(new_version=str(DEV_VERSION)) + def test_metadata_upgrade(self, new_version): + """ + Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to + """ + + self.driver.start() + self.start_all_nodes_with(str(LATEST_0_10_0)) + + self.processors = [self.processor1, self.processor2, self.processor3] + + counter = 1 + random.seed() + + # first rolling bounce + random.shuffle(self.processors) + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + self.do_rolling_bounce(p, "0.10.0", new_version, counter) + counter = counter + 1 + + # second rolling bounce + random.shuffle(self.processors) + for p in self.processors: + self.do_rolling_bounce(p, "", new_version, counter) + counter = counter + 1 + + # shutdown + self.driver.stop() + self.driver.wait() + + random.shuffle(self.processors) + for p in self.processors: + node = p.node + with node.account.monitor_log(p.STDOUT_FILE) as monitor: + p.stop() + monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", + timeout_sec=60, + err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + + self.driver.stop() + + def start_all_nodes_with(self, version): + # start first with + self.prepare_for(self.processor1, version) + node1 = self.processor1.node + with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor: + with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor: + self.processor1.start() + log_monitor.wait_until("Kafka version : " + version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) + monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) + + # start second with + self.prepare_for(self.processor2, version) + node2 = self.processor2.node + with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: + with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: + with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor: + self.processor2.start() + log_monitor.wait_until("Kafka version : " + version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account)) + first_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) + second_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) + + # start third with + self.prepare_for(self.processor3, version) + node3 = self.processor3.node + with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: + with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: + with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: + with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor: + self.processor3.start() + log_monitor.wait_until("Kafka version : " + version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account)) + first_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) + second_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) + third_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account)) + + @staticmethod + def prepare_for(processor, version): + processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False) + processor.set_version(version) + + def do_rolling_bounce(self, processor, upgrade_from, new_version, counter): + first_other_processor = None + second_other_processor = None + for p in self.processors: + if p != processor: + if first_other_processor is None: + first_other_processor = p + else: + second_other_processor = p + + node = processor.node + first_other_node = first_other_processor.node + second_other_node = second_other_processor.node + + # stop processor and wait for rebalance of others + with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor: + with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor: + processor.stop() + first_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account)) + second_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account)) + node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) + + if upgrade_from == "": # upgrade disabled -- second round of rolling bounces + roll_counter = ".1-" # second round of rolling bounces + else: + roll_counter = ".0-" # first round of rolling boundes + + node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False) + node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False) + node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False) + + if new_version == str(DEV_VERSION): + processor.set_version("") # set to TRUNK + else: + processor.set_version(new_version) + processor.set_upgrade_from(upgrade_from) + + grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" " + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + with node.account.monitor_log(processor.LOG_FILE) as log_monitor: + with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor: + with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor: + processor.start() + + log_monitor.wait_until("Kafka version : " + new_version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account)) + first_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account)) + found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True)) + if len(found) > 0: + raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") + + second_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account)) + found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True)) + if len(found) > 0: + raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") + + monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account)) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index f63a7c1..94ba100 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -61,6 +61,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") +DEV_VERSION = KafkaVersion("0.11.0.3-SNAPSHOT") # 0.8.2.X versions V_0_8_2_1 = KafkaVersion("0.8.2.1") @@ -91,5 +92,7 @@ LATEST_0_10 = LATEST_0_10_2 # 0.11.0.0 versions V_0_11_0_0 = KafkaVersion("0.11.0.0") -LATEST_0_11_0 = V_0_11_0_0 +V_0_11_0_1 = KafkaVersion("0.11.0.1") +V_0_11_0_2 = KafkaVersion("0.11.0.2") +LATEST_0_11_0 = V_0_11_0_2 LATEST_0_11 = LATEST_0_11_0 diff --git a/vagrant/base.sh b/vagrant/base.sh index 4c0add5..28b81ed 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -64,6 +64,8 @@ get_kafka() { kafka_dir=/opt/kafka-$version url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz + # the .tgz above does not include the streams test jar hence we need to get it separately + url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar if [ ! -d /opt/kafka-$version ]; then pushd /tmp curl -O $url -- To stop receiving notification emails like this one, please contact mjsax@apache.org.