This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new ea7d2b5 KAFKA-9533: ValueTransform forwards `null` values (#8108)
ea7d2b5 is described below
commit ea7d2b56320635de9a3082812e41ba2e782efe01
Author: Michael Viamari <mviamari@users.noreply.github.com>
AuthorDate: Wed Feb 19 13:20:35 2020 -0800
KAFKA-9533: ValueTransform forwards `null` values (#8108)
Fixes a bug where KStream#transformValues would forward null values from the provided
ValueTransform#transform operation.
A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull.
A parallel test for non-key ValueTransformer was not added, as they share the same code path.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
---
.../kstream/internals/KStreamTransformValues.java | 5 ++++-
.../internals/KStreamTransformValuesTest.java | 20 ++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 843606b..06216fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -53,7 +53,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
@Override
public void process(final K key, final V value) {
- context.forward(key, valueTransformer.transform(key, value));
+ final R transformedValue = valueTransformer.transform(key, value);
+ if (transformedValue != null) {
+ context.forward(key, transformedValue);
+ }
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index a786166..ca82a90 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.SingletonNoOpValueTransformer;
import org.apache.kafka.test.StreamsTestUtils;
+import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -41,6 +42,7 @@ import org.junit.runner.RunWith;
import java.util.Properties;
+import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
@@ -129,6 +131,24 @@ public class KStreamTransformValuesTest {
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
+ @Test
+ public void shouldEmitNoRecordIfTransformReturnsNull() {
+ final ProcessorContext context = mock(ProcessorContext.class);
+ final ValueTransformerWithKey<Integer, Integer, Integer> valueTransformer =
mock(ValueTransformerWithKey.class);
+ final KStreamTransformValues.KStreamTransformValuesProcessor<Integer, Integer,
Integer> processor =
+ new KStreamTransformValues.KStreamTransformValuesProcessor<>(valueTransformer);
+ processor.init(context);
+
+ final Integer inputKey = 1;
+ final Integer inputValue = 10;
+ EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andStubReturn(null);
+ EasyMock.replay(context);
+
+ processor.process(inputKey, inputValue);
+
+ EasyMock.verify(context);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
|