kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5379 follow up: reduce redundant mock processor context
Date Thu, 31 Aug 2017 15:43:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9836adc73 -> 9ebc303bb


KAFKA-5379 follow up: reduce redundant mock processor context

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>

Closes #3757 from guozhangwang/K5379-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ebc303b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ebc303b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ebc303b

Branch: refs/heads/trunk
Commit: 9ebc303bb85bf5aae2b732c60d3b3e0a6e07512d
Parents: 9836adc
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Aug 31 08:43:13 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Aug 31 08:43:13 2017 -0700

----------------------------------------------------------------------
 .../internals/AbstractProcessorContext.java     |   4 +-
 .../processor/internals/ProcessorNodeTest.java  |   6 +-
 .../state/internals/RocksDBStoreTest.java       |  49 +-----
 .../apache/kafka/test/MockProcessorContext.java | 160 +++++++------------
 4 files changed, 66 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ebc303b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 2c28226..3c8e077 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -41,8 +41,8 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     private final ThreadCache cache;
     private final Serde valueSerde;
     private boolean initialized;
-    private RecordContext recordContext;
-    private ProcessorNode currentNode;
+    protected RecordContext recordContext;
+    protected ProcessorNode currentNode;
     final StateManager stateManager;
 
     public AbstractProcessorContext(final TaskId taskId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ebc303b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index ab29c5c..f37a674 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -102,16 +102,16 @@ public class ProcessorNodeTest {
                 "The average number of occurrence of " + opName + " operation per second.",
metricTags)));
 
     }
+
     @Test
     public void testMetrics() {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class,
Bytes.class);
 
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new
RecordCollectorImpl(null, null));
+        final Metrics metrics = new Metrics();
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new
RecordCollectorImpl(null, null), metrics);
         final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
         node.init(context);
 
-        Metrics metrics = context.baseMetrics();
-        String name = "task." + context.taskId();
         String[] latencyOperations = {"process", "punctuate", "create", "destroy"};
         String throughputOperation =  "forward";
         String groupName = "stream-processor-node-metrics";

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ebc303b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 8b916cb..2f93a7c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -22,11 +22,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -106,7 +101,7 @@ public class RocksDBStoreTest {
         configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
         configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         MockRocksDbConfigSetter.called = false;
-        subject.openDB(new ConfigurableProcessorContext(new StreamsConfig(configs), tempDir));
+        subject.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs)));
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
@@ -330,46 +325,4 @@ public class RocksDBStoreTest {
         entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
         return entries;
     }
-
-    private static class ConfigurableProcessorContext extends AbstractProcessorContext {
-        private final File stateDir;
-
-        ConfigurableProcessorContext(final StreamsConfig config,
-                                     final File stateDir) {
-            super(null, null, config, null, null, null);
-            this.stateDir = stateDir;
-        }
-
-        @Override
-        public File stateDir() {
-            return stateDir;
-        }
-
-        @Override
-        public StateStore getStateStore(final String name) {
-            return null;
-        }
-
-        @Override
-        public Cancellable schedule(final long interval,
-                                    final PunctuationType type,
-                                    final Punctuator callback) {
-            return null;
-        }
-
-        @Override
-        public void schedule(long interval) { }
-
-        @Override
-        public <K, V> void forward(K key, V value) { }
-
-        @Override
-        public <K, V> void forward(K key, V value, int childIndex) { }
-
-        @Override
-        public <K, V> void forward(K key, V value, String childName) { }
-
-        @Override
-        public void commit() { }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ebc303b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index b34b201..edbff13 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -16,14 +16,10 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -32,13 +28,12 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -51,54 +46,70 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-public class MockProcessorContext implements InternalProcessorContext, RecordCollector.Supplier
{
+public class MockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier
{
 
+    private final File stateDir;
+    private final Metrics metrics;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
     private final RecordCollector.Supplier recordCollectorSupplier;
-    private final File stateDir;
-    private final Metrics metrics;
-    private final StreamsMetrics streamsMetrics;
-    private final ThreadCache cache;
     private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
-
     private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
 
     private long timestamp = -1L;
-    private RecordContext recordContext;
-    private ProcessorNode currentNode;
 
-    public MockProcessorContext(final StateSerdes<?, ?> serdes, final RecordCollector
collector) {
+    public MockProcessorContext(final File stateDir,
+                                final StreamsConfig config) {
+        this(stateDir, null, null, new Metrics(), config, null, null);
+    }
+
+    public MockProcessorContext(final StateSerdes<?, ?> serdes,
+                                final RecordCollector collector) {
         this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
     }
 
+    public MockProcessorContext(final StateSerdes<?, ?> serdes,
+                                final RecordCollector collector,
+                                final Metrics metrics) {
+        this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
new RecordCollector.Supplier() {
+            @Override
+            public RecordCollector recordCollector() {
+                return collector;
+            }
+        }, null);
+    }
+
     public MockProcessorContext(final File stateDir,
                                 final Serde<?> keySerde,
                                 final Serde<?> valSerde,
                                 final RecordCollector collector,
                                 final ThreadCache cache) {
-        this(stateDir, keySerde, valSerde,
-                new RecordCollector.Supplier() {
-                    @Override
-                    public RecordCollector recordCollector() {
-                        return collector;
-                    }
-                },
-                cache);
+        this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
new RecordCollector.Supplier() {
+            @Override
+            public RecordCollector recordCollector() {
+                return collector;
+            }
+        }, cache);
     }
 
-    public MockProcessorContext(final File stateDir,
+    private MockProcessorContext(final File stateDir,
                                 final Serde<?> keySerde,
                                 final Serde<?> valSerde,
+                                final Metrics metrics,
+                                final StreamsConfig config,
                                 final RecordCollector.Supplier collectorSupplier,
                                 final ThreadCache cache) {
+        super(new TaskId(0, 0),
+                config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
+                config,
+                new MockStreamsMetrics(metrics),
+                null,
+                cache);
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;
-        recordCollectorSupplier = collectorSupplier;
-        metrics = new Metrics(new MetricConfig(), Collections.singletonList((MetricsReporter)
new JmxReporter()), new MockTime(), true);
-        this.cache = cache;
-        streamsMetrics = new MockStreamsMetrics(metrics);
+        this.metrics = metrics;
+        this.recordCollectorSupplier = collectorSupplier;
     }
 
     @Override
@@ -111,27 +122,7 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
         return recordCollector;
     }
 
-    public void setTime(final long timestamp) {
-        if (recordContext != null) {
-            recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(),
recordContext.partition(), recordContext.topic());
-        }
-        this.timestamp = timestamp;
-    }
-
-    public Metrics baseMetrics() {
-        return metrics;
-    }
-
-    @Override
-    public TaskId taskId() {
-        return new TaskId(0, 0);
-    }
-
-    @Override
-    public String applicationId() {
-        return "mockApplication";
-    }
-
+    // serdes will override whatever specified in the configs
     @Override
     public Serde<?> keySerde() {
         return keySerde;
@@ -142,11 +133,7 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
         return valSerde;
     }
 
-    @Override
-    public ThreadCache getCache() {
-        return cache;
-    }
-
+    // state mgr will be overridden by the state dir and store maps
     @Override
     public void initialized() {}
 
@@ -160,11 +147,6 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
     }
 
     @Override
-    public StreamsMetrics metrics() {
-        return streamsMetrics;
-    }
-
-    @Override
     public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
func) {
         storeMap.put(store.name(), store);
         restoreFuncs.put(store.name(), func);
@@ -185,6 +167,11 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
     }
 
     @Override
+    public void commit() {
+        throw new UnsupportedOperationException("commit() not supported.");
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value) {
         final ProcessorNode thisNode = currentNode;
@@ -228,10 +215,21 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
         }
     }
 
+    // allow only setting time but not other fields in for record context,
+    // and also not throwing exceptions if record context is not available.
+    public void setTime(final long timestamp) {
+        if (recordContext != null) {
+            recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(),
recordContext.partition(), recordContext.topic());
+        }
+        this.timestamp = timestamp;
+    }
 
     @Override
-    public void commit() {
-        throw new UnsupportedOperationException("commit() not supported.");
+    public long timestamp() {
+        if (recordContext == null) {
+            return timestamp;
+        }
+        return recordContext.timestamp();
     }
 
     @Override
@@ -258,29 +256,6 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
         return recordContext.offset();
     }
 
-    @Override
-    public long timestamp() {
-        if (recordContext == null) {
-            return timestamp;
-        }
-        return recordContext.timestamp();
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public RecordContext recordContext() {
-        return recordContext;
-    }
-
     Map<String, StateStore> allStateStores() {
         return Collections.unmodifiableMap(storeMap);
     }
@@ -302,21 +277,6 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
         restoreListener.onRestoreEnd(null, storeName, 0L);
     }
 
-    @Override
-    public void setRecordContext(final RecordContext recordContext) {
-        this.recordContext = recordContext;
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode currentNode) {
-        this.currentNode = currentNode;
-    }
-
-    @Override
-    public ProcessorNode currentNode() {
-        return currentNode;
-    }
-
     public void close() {
         metrics.close();
     }


Mime
View raw message