kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [1/3] kafka git commit: KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.
Date Mon, 31 Aug 2015 19:26:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9c936b186 -> 3803e5cb3


http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py
index 344f7ef..b4adf53 100644
--- a/tests/kafkatest/tests/copycat_test.py
+++ b/tests/kafkatest/tests/copycat_test.py
@@ -15,8 +15,10 @@
 
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.copycat import CopycatStandaloneService
+from kafkatest.services.console_consumer import ConsoleConsumer
 from ducktape.utils.util import wait_until
-import hashlib, subprocess
+from ducktape.mark import parametrize
+import hashlib, subprocess, json
 
 class CopycatStandaloneFileTest(KafkaTest):
     """
@@ -30,8 +32,14 @@ class CopycatStandaloneFileTest(KafkaTest):
 
     OFFSETS_FILE = "/mnt/copycat.offsets"
 
-    FIRST_INPUT = "foo\nbar\nbaz\n"
-    SECOND_INPUT = "razz\nma\ntazz\n"
+    TOPIC = "test"
+
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
 
     def __init__(self, test_context):
         super(CopycatStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1,
topics={
@@ -40,8 +48,18 @@ class CopycatStandaloneFileTest(KafkaTest):
 
         self.source = CopycatStandaloneService(test_context, self.kafka, [self.INPUT_FILE,
self.OFFSETS_FILE])
         self.sink = CopycatStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE,
self.OFFSETS_FILE])
+        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC,
consumer_timeout_ms=1000)
+
+    @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True)
+    @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=False)
+    @parametrize(converter="org.apache.kafka.copycat.storage.StringConverter", schemas=None)
+    def test_file_source_and_sink(self, converter="org.apache.kafka.json.JsonConverter",
schemas=True):
+        assert converter != None, "converter type must be set"
+        # Template parameters
+        self.key_converter = converter
+        self.value_converter = converter
+        self.schemas = schemas
 
-    def test_file_source_and_sink(self):
         # These need to be set
         self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties"))
         self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties"))
@@ -61,6 +79,13 @@ class CopycatStandaloneFileTest(KafkaTest):
         self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >>
" + self.INPUT_FILE)
         wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60,
err_msg="Sink output file never converged to the same state as the input file")
 
+        # Validate the format of the data in the Kafka topic
+        self.consumer_validator.run()
+        expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload":
line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
+        decoder = (json.loads if converter.endswith("JsonConverter") else str)
+        actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
+        assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
+
     def validate_output(self, value):
         try:
             output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/templates/copycat-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-sink.properties b/tests/kafkatest/tests/templates/copycat-file-sink.properties
index c7865a6..77c43c7 100644
--- a/tests/kafkatest/tests/templates/copycat-file-sink.properties
+++ b/tests/kafkatest/tests/templates/copycat-file-sink.properties
@@ -17,4 +17,4 @@ name=local-file-sink
 connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
 tasks.max=1
 file={{ OUTPUT_FILE }}
-topics=test
\ No newline at end of file
+topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/templates/copycat-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-source.properties b/tests/kafkatest/tests/templates/copycat-file-source.properties
index 8612ed7..68dabc2 100644
--- a/tests/kafkatest/tests/templates/copycat-file-source.properties
+++ b/tests/kafkatest/tests/templates/copycat-file-source.properties
@@ -17,4 +17,4 @@ name=local-file-source
 connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
 tasks.max=1
 file={{ INPUT_FILE }}
-topic=test
\ No newline at end of file
+topic={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/templates/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties
index 5ffb487..39db6ce 100644
--- a/tests/kafkatest/tests/templates/copycat-standalone.properties
+++ b/tests/kafkatest/tests/templates/copycat-standalone.properties
@@ -15,11 +15,18 @@
 
 bootstrap.servers={{ kafka.bootstrap_servers() }}
 
-key.converter=org.apache.kafka.copycat.json.JsonConverter
-value.converter=org.apache.kafka.copycat.json.JsonConverter
-key.serializer=org.apache.kafka.copycat.json.JsonSerializer
-value.serializer=org.apache.kafka.copycat.json.JsonSerializer
-key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
-value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter")
}}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.key.converter.schemas.enable=false
+offset.value.converter.schemas.enable=false
 
 offset.storage.file.filename={{ OFFSETS_FILE }}


Mime
View raw message