From commits-return-14707-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Jun 4 23:17:23 2020 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 92F0919B54 for ; Thu, 4 Jun 2020 23:17:23 +0000 (UTC) Received: (qmail 81249 invoked by uid 500); 4 Jun 2020 23:17:22 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 81210 invoked by uid 500); 4 Jun 2020 23:17:22 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 81201 invoked by uid 99); 4 Jun 2020 23:17:22 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jun 2020 23:17:22 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7F8FA814A0; Thu, 4 Jun 2020 23:17:22 +0000 (UTC) Date: Thu, 04 Jun 2020 23:17:18 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.6 updated: KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159131263462.5143.7618026946453901723@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.6 X-Git-Reftype: branch X-Git-Oldrev: cb310f1e4350e98e56c7df9cb38d827eb6304667 X-Git-Newrev: 382bb28ac960aafe9fa5eae791606cabd903c52c X-Git-Rev: 382bb28ac960aafe9fa5eae791606cabd903c52c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 382bb28 KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759) 382bb28 is described below commit 382bb28ac960aafe9fa5eae791606cabd903c52c Author: Matthias J. Sax AuthorDate: Thu Jun 4 16:00:59 2020 -0700 KAFKA-10066: TestOutputTopic should pass record headers into deserializers (#8759) Reviewers: John Roesler , Boyang Chen , Chia-Ping Tsai --- .../apache/kafka/streams/TopologyTestDriver.java | 8 ++-- .../kafka/streams/TopologyTestDriverTest.java | 54 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 923a980..db31012 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -802,8 +802,8 @@ public class TopologyTestDriver implements Closeable { if (record == null) { return null; } - final K key = keyDeserializer.deserialize(record.topic(), record.key()); - final V value = valueDeserializer.deserialize(record.topic(), record.value()); + final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()); + final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value()); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers()); } @@ -906,8 +906,8 @@ public class TopologyTestDriver implements Closeable { if (record == null) { throw new NoSuchElementException("Empty topic: " + topic); } - final K key = keyDeserializer.deserialize(record.topic(), record.key()); - final V value = valueDeserializer.deserialize(record.topic(), record.value()); + final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()); + final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value()); return new TestRecord<>(key, value, record.headers(), record.timestamp()); } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 793f907..9b7b554 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -22,10 +22,13 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.SystemTime; @@ -70,6 +73,7 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -712,6 +716,56 @@ public class TopologyTestDriverTest { } @Test + public void shouldPassRecordHeadersIntoSerializersAndDeserializers() { + testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config); + + final AtomicBoolean passedHeadersToKeySerializer = new AtomicBoolean(false); + final AtomicBoolean passedHeadersToValueSerializer = new AtomicBoolean(false); + final AtomicBoolean passedHeadersToKeyDeserializer = new AtomicBoolean(false); + final AtomicBoolean passedHeadersToValueDeserializer = new AtomicBoolean(false); + + final Serializer keySerializer = new ByteArraySerializer() { + @Override + public byte[] serialize(final String topic, final Headers headers, final byte[] data) { + passedHeadersToKeySerializer.set(true); + return serialize(topic, data); + } + }; + final Serializer valueSerializer = new ByteArraySerializer() { + @Override + public byte[] serialize(final String topic, final Headers headers, final byte[] data) { + passedHeadersToValueSerializer.set(true); + return serialize(topic, data); + } + }; + + final Deserializer keyDeserializer = new ByteArrayDeserializer() { + @Override + public byte[] deserialize(final String topic, final Headers headers, final byte[] data) { + passedHeadersToKeyDeserializer.set(true); + return deserialize(topic, data); + } + }; + final Deserializer valueDeserializer = new ByteArrayDeserializer() { + @Override + public byte[] deserialize(final String topic, final Headers headers, final byte[] data) { + passedHeadersToValueDeserializer.set(true); + return deserialize(topic, data); + } + }; + + final TestInputTopic inputTopic = testDriver.createInputTopic(SOURCE_TOPIC_1, keySerializer, valueSerializer); + final TestOutputTopic outputTopic = testDriver.createOutputTopic(SINK_TOPIC_1, keyDeserializer, valueDeserializer); + inputTopic.pipeInput(testRecord1); + outputTopic.readRecord(); + + assertThat(passedHeadersToKeySerializer.get(), equalTo(true)); + assertThat(passedHeadersToValueSerializer.get(), equalTo(true)); + assertThat(passedHeadersToKeyDeserializer.get(), equalTo(true)); + assertThat(passedHeadersToValueDeserializer.get(), equalTo(true)); + } + + @Test public void shouldUseSinkSpecificSerializers() { final Topology topology = new Topology();