kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Fix broker compatibility tests
Date Sun, 03 Dec 2017 17:05:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2d8918b1a -> 9204197ab


MINOR: Fix broker compatibility tests

Updated the System test `stream_broker_compatibility_test.py` to address system test failures
as we have removed explicit broker version checking

- Ignore the `0.8.2.2` and `0.9.0.0` tests because the `NetworkClient` only logs `UnsupportedVersionException`s
that occur and will continue to retry connecting.  Once issue https://issues.apache.org/jira/browse/KAFKA-6297
is addressed, we may re-enable these tests.
- Updated existing tests expected error messages
- Updated Streams code in test for to make sure we fail fast for incompatible brokers

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4286 from bbejeck/MINOR_fix_broker_compatibility_tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9204197a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9204197a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9204197a

Branch: refs/heads/trunk
Commit: 9204197abf2e13d345ad6ee6a7f827268a49ebca
Parents: 2d8918b
Author: Bill Bejeck <bill@confluent.io>
Authored: Sun Dec 3 09:05:18 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Dec 3 09:05:18 2017 -0800

----------------------------------------------------------------------
 .../streams/tests/BrokerCompatibilityTest.java  | 22 ++++++++++++++++----
 .../streams_broker_compatibility_test.py        | 11 +++++-----
 2 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9204197a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index ca7620d..ef1f63c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -24,12 +24,15 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
@@ -61,6 +64,7 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         if (eosEnabled) {
             streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
         }
@@ -70,17 +74,27 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout + 1);
         //TODO remove this config or set to smaller value when KIP-91 is merged
         streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
+        Serde<String> stringSerde = Serdes.String();
 
 
         final StreamsBuilder builder = new StreamsBuilder();
-        builder.stream(SOURCE_TOPIC).to(SINK_TOPIC);
+        builder.<String, String>stream(SOURCE_TOPIC).groupByKey(Serialized.with(stringSerde,
stringSerde))
+            .count()
+            .toStream()
+            .mapValues(new ValueMapper<Long, String>() {
+                @Override
+                public String apply(Long value) {
+                    return value.toString();
+                }
+            })
+            .to(SINK_TOPIC);
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
-                System.out.println("FATAL: An unexpected exception is encountered on thread
" + t + ": " + e);
-
+                System.err.println("FATAL: An unexpected exception " + e);
+                System.err.flush();
                 streams.close(30, TimeUnit.SECONDS);
             }
         });
@@ -124,7 +138,7 @@ public class BrokerCompatibilityTest {
         while (true) {
             final ConsumerRecords<String, String> records = consumer.poll(100);
             for (final ConsumerRecord<String, String> record : records) {
-                if (record.key().equals("key") && record.value().equals("value"))
{
+                if (record.key().equals("key") && record.value().equals("1")) {
                     consumer.close();
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9204197a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 1405c34..92a8c1e 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import ignore
 from ducktape.mark import parametrize
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsBrokerCompatibilityService
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
@@ -67,9 +67,9 @@ class StreamsBrokerCompatibility(Test):
 
         processor.node.account.ssh(processor.start_cmd(processor.node))
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
-            monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException:
Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.',
+            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException:
The broker does not support LIST_OFFSETS ',
                                timeout_sec=60,
-                               err_msg="Never saw 'EOS requires broker version 0.11+' error
message " + str(processor.node.account))
+                               err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException:
The broker does not support LIST_OFFSETS ' error message " + str(processor.node.account))
 
         self.kafka.stop()
 
@@ -102,12 +102,13 @@ class StreamsBrokerCompatibility(Test):
 
         processor.node.account.ssh(processor.start_cmd(processor.node))
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
-            monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException:
Kafka Streams requires broker version 0.10.1.x or higher.',
+            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException:
Could not create internal topics.',
                         timeout_sec=60,
-                        err_msg="Never saw 'Streams requires broker verion 0.10.1+' error
message " + str(processor.node.account))
+                        err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException:
Could not create internal topics.' error message " + str(processor.node.account))
 
         self.kafka.stop()
 
+    @ignore
     @parametrize(broker_version=str(LATEST_0_9))
     @parametrize(broker_version=str(LATEST_0_8_2))
     def test_timeout_on_pre_010_brokers(self, broker_version):


Mime
View raw message