kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4619: Dissallow to output records with unknown keys in TransformValues
Date Mon, 16 Jan 2017 19:24:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b712b8675 -> a8eb1d13e


KAFKA-4619: Dissallow to output records with unknown keys in TransformValues

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2346 from mjsax/kafka-4619-fixTransformValues


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a8eb1d13
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a8eb1d13
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a8eb1d13

Branch: refs/heads/trunk
Commit: a8eb1d13e0d962ca3bb3f7708cbf92bd97b54ffd
Parents: b712b86
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Jan 16 11:24:20 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 16 11:24:20 2017 -0800

----------------------------------------------------------------------
 .../internals/KStreamTransformValues.java       | 119 ++++++++++++++++++-
 .../internals/KStreamTransformValuesTest.java   |  81 ++++++++++++-
 2 files changed, 189 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a8eb1d13/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index cb9aab1..f689c95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -17,11 +17,22 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
 
 public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
{
 
@@ -38,6 +49,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
 
     public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K,
V> {
 
+        private static final Logger log = LoggerFactory.getLogger(KStreamTransformValuesProcessor.class);
         private final ValueTransformer<V, R> valueTransformer;
         private ProcessorContext context;
 
@@ -46,8 +58,104 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
         }
 
         @Override
-        public void init(ProcessorContext context) {
-            valueTransformer.init(context);
+        public void init(final ProcessorContext context) {
+            valueTransformer.init(
+                new ProcessorContext() {
+                    @Override
+                    public String applicationId() {
+                        return context.applicationId();
+                    }
+
+                    @Override
+                    public TaskId taskId() {
+                        return context.taskId();
+                    }
+
+                    @Override
+                    public Serde<?> keySerde() {
+                        return context.keySerde();
+                    }
+
+                    @Override
+                    public Serde<?> valueSerde() {
+                        return context.valueSerde();
+                    }
+
+                    @Override
+                    public File stateDir() {
+                        return context.stateDir();
+                    }
+
+                    @Override
+                    public StreamsMetrics metrics() {
+                        return context.metrics();
+                    }
+
+                    @Override
+                    public void register(final StateStore store, final boolean loggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
+                        context.register(store, loggingEnabled, stateRestoreCallback);
+                    }
+
+                    @Override
+                    public StateStore getStateStore(final String name) {
+                        return context.getStateStore(name);
+                    }
+
+                    @Override
+                    public void schedule(final long interval) {
+                        context.schedule(interval);
+                    }
+
+                    @Override
+                    public <K, V> void forward(final K key, final V value) {
+                        throw new StreamsException("ProcessorContext#forward() must not be
called within TransformValues.");
+                    }
+
+                    @Override
+                    public <K, V> void forward(final K key, final V value, final int
childIndex) {
+                        throw new StreamsException("ProcessorContext#forward() must not be
called within TransformValues.");
+                    }
+
+                    @Override
+                    public <K, V> void forward(final K key, final V value, final String
childName) {
+                        throw new StreamsException("ProcessorContext#forward() must not be
called within TransformValues.");
+                    }
+
+                    @Override
+                    public void commit() {
+                        context.commit();
+                    }
+
+                    @Override
+                    public String topic() {
+                        return context.topic();
+                    }
+
+                    @Override
+                    public int partition() {
+                        return context.partition();
+                    }
+
+                    @Override
+                    public long offset() {
+                        return context.offset();
+                    }
+
+                    @Override
+                    public long timestamp() {
+                        return context.timestamp();
+                    }
+
+                    @Override
+                    public Map<String, Object> appConfigs() {
+                        return context.appConfigs();
+                    }
+
+                    @Override
+                    public Map<String, Object> appConfigsWithPrefix(String prefix)
{
+                        return context.appConfigsWithPrefix(prefix);
+                    }
+                });
             this.context = context;
         }
 
@@ -58,10 +166,9 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
 
         @Override
         public void punctuate(long timestamp) {
-            R ret = valueTransformer.punctuate(timestamp);
-
-            if (ret != null)
-                context.forward(null, ret);
+            if (valueTransformer.punctuate(timestamp) != null) {
+                throw new StreamsException("ValueTransformer#punctuate must return null.");
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a8eb1d13/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 557388d..0fb0823 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -19,10 +19,12 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -30,6 +32,7 @@ import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class KStreamTransformValuesTest {
 
@@ -70,7 +73,7 @@ public class KStreamTransformValuesTest {
 
                         @Override
                         public Integer punctuate(long timestamp) {
-                            return (int) timestamp;
+                            return null;
                         }
 
                         @Override
@@ -94,14 +97,82 @@ public class KStreamTransformValuesTest {
 
         assertEquals(4, processor.processed.size());
 
-        driver.punctuate(2);
-        driver.punctuate(3);
-
-        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110", "null:2", "null:3"};
+        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processor.processed.get(i));
         }
     }
 
+    @Test
+    public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
+        final KStreamTransformValues<Integer, Integer, Integer> transformValue = new
KStreamTransformValues<>(new ValueTransformerSupplier<Integer, Integer>() {
+            @Override
+            public ValueTransformer<Integer, Integer> get() {
+                return new BadValueTransformer();
+            }
+        });
+
+        final Processor transformValueProcessor = transformValue.get();
+        transformValueProcessor.init(null);
+
+        try {
+            transformValueProcessor.process(null, 0);
+            fail("should not allow call to context.forward() within ValueTransformer");
+        } catch (final StreamsException e) {
+            // expected
+        }
+
+        try {
+            transformValueProcessor.process(null, 1);
+            fail("should not allow call to context.forward() within ValueTransformer");
+        } catch (final StreamsException e) {
+            // expected
+        }
+
+        try {
+            transformValueProcessor.process(null, 2);
+            fail("should not allow call to context.forward() within ValueTransformer");
+        } catch (final StreamsException e) {
+            // expected
+        }
+
+        try {
+            transformValueProcessor.punctuate(0);
+            fail("should not allow ValueTransformer#puntuate() to return not-null value");
+        } catch (final StreamsException e) {
+            // expected
+        }
+    }
+
+    private static final class BadValueTransformer implements ValueTransformer<Integer,
Integer> {
+        private ProcessorContext context;
+
+        @Override
+        public void init(ProcessorContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public Integer transform(Integer value) {
+            if (value == 0) {
+                context.forward(null, null);
+            }
+            if (value == 1) {
+                context.forward(null, null, null);
+            }
+            if (value == 2) {
+                context.forward(null, null, 0);
+            }
+            throw new RuntimeException("Should never happen in this test");
+        }
+
+        @Override
+        public Integer punctuate(long timestamp) {
+            return 1; // any not-null falue
+        }
+
+        @Override
+        public void close() { }
+    }
 }


Mime
View raw message