kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.2 updated: Revert "KAFKA-9533: ValueTransform forwards `null` values (#8108)"
Date Tue, 25 Feb 2020 18:09:48 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 7bab36d  Revert "KAFKA-9533: ValueTransform forwards `null` values (#8108)"
7bab36d is described below

commit 7bab36dc97c30043fbbd09174e056ec7ed8e4f43
Author: bill <bbejeck@gmail.com>
AuthorDate: Tue Feb 25 13:03:33 2020 -0500

    Revert "KAFKA-9533: ValueTransform forwards `null` values (#8108)"
    
    This reverts commit a41d3d86c13a55a75e67b1635bc74361ebe6d7af.
---
 .../kstream/internals/KStreamTransformValues.java    |  5 +----
 .../internals/KStreamTransformValuesTest.java        | 20 --------------------
 2 files changed, 1 insertion(+), 24 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 06216fc..843606b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -53,10 +53,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
 
         @Override
         public void process(final K key, final V value) {
-            final R transformedValue = valueTransformer.transform(key, value);
-            if (transformedValue != null) {
-                context.forward(key, transformedValue);
-            }
+            context.forward(key, valueTransformer.transform(key, value));
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index e05bc27..94d06eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.SingletonNoOpValueTransformer;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -42,7 +41,6 @@ import org.junit.runner.RunWith;
 
 import java.util.Properties;
 
-import static org.easymock.EasyMock.mock;
 import static org.hamcrest.CoreMatchers.isA;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
@@ -144,24 +142,6 @@ public class KStreamTransformValuesTest {
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
-    @Test
-    public void shouldEmitNoRecordIfTransformReturnsNull() {
-        final ProcessorContext context = mock(ProcessorContext.class);
-        final ValueTransformerWithKey<Integer, Integer, Integer> valueTransformer =
mock(ValueTransformerWithKey.class);
-        final KStreamTransformValues.KStreamTransformValuesProcessor<Integer, Integer,
Integer> processor =
-            new KStreamTransformValues.KStreamTransformValuesProcessor<>(valueTransformer);
-        processor.init(context);
-
-        final Integer inputKey = 1;
-        final Integer inputValue = 10;
-        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andStubReturn(null);
-        EasyMock.replay(context);
-
-        processor.process(inputKey, inputValue);
-
-        EasyMock.verify(context);
-    }
-
     @SuppressWarnings("unchecked")
     @Test
     public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {


Mime
View raw message