kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3740: Part I: expose StreamConfig properties in ProcessorContext
Date Thu, 30 Jun 2016 18:10:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7e9f5a7ee -> 7d6b416f9


KAFKA-3740: Part I: expose StreamConfig properties in ProcessorContext

This is the part I of the work to add the StreamsConfig to ProcessorContext.

We need to access StreamsConfig in the ProcessorContext so other components (e.g. RocksDBWindowStore
or LRUCache can retrieve config parameter from application)

Author: Henry Cai <hcai@pinterest.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1553 from HenryCaiHaiying/config


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6b416f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6b416f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6b416f

Branch: refs/heads/trunk
Commit: 7d6b416f9e75f1d9f886b9b43637c488667222e1
Parents: 7e9f5a7
Author: Henry Cai <hcai@pinterest.com>
Authored: Thu Jun 30 11:10:16 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 30 11:10:16 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/ProcessorContext.java     | 24 ++++++++++++++++++++
 .../internals/ProcessorContextImpl.java         | 13 +++++++++++
 .../processor/internals/StandbyContextImpl.java | 13 +++++++++++
 .../streams/state/KeyValueStoreTestDriver.java  | 10 ++++++++
 .../apache/kafka/test/MockProcessorContext.java | 10 ++++++++
 5 files changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6b416f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 815b5b4..acecf91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 
 import java.io.File;
+import java.util.Map;
 
 /**
  * Processor context interface.
@@ -160,4 +161,27 @@ public interface ProcessorContext {
      * @return the timestamp
      */
     long timestamp();
+
+    /**
+     * Returns all the application config properties as key/value pairs.
+     *
+     * The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the ProcessorContext.
+     *
+     * @return all the key/values from the StreamsConfig properties
+     */
+    Map<String, Object> appConfigs();
+
+    /**
+     * Returns all the application config properties with the given key prefix, as key/value
pairs
+     * stripping the prefix.
+     *
+     * The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the ProcessorContext.
+     *
+     * @param prefix the properties prefix
+     * @return the key/values matching the given prefix from the StreamsConfig properties.
+     *
+     */
+    Map<String, Object> appConfigsWithPrefix(String prefix);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6b416f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 10e7d68..00ffb20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.io.File;
+import java.util.Map;
 
 public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
@@ -38,6 +39,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     private final RecordCollector collector;
     private final ProcessorStateManager stateMgr;
 
+    private final StreamsConfig config;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
 
@@ -56,6 +58,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         this.collector = collector;
         this.stateMgr = stateMgr;
 
+        this.config = config;
         this.keySerde = config.keySerde();
         this.valSerde = config.valueSerde();
 
@@ -206,4 +209,14 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     public void schedule(long interval) {
         task.schedule(interval);
     }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        return config.originals();
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6b416f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index d4b47e2..039ab66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.io.File;
+import java.util.Map;
 
 public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
@@ -34,6 +35,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
+    private final StreamsConfig config;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
 
@@ -49,6 +51,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
         this.metrics = metrics;
         this.stateMgr = stateMgr;
 
+        this.config = config;
         this.keySerde = config.keySerde();
         this.valSerde = config.valueSerde();
 
@@ -188,4 +191,14 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     public void schedule(long interval) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported
in standby tasks.");
     }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        return config.originals();
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6b416f/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 6d990b4..1861e06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -251,6 +251,16 @@ public class KeyValueStoreTestDriver<K, V> {
             public File stateDir() {
                 return stateDir;
             }
+
+            @Override
+            public Map<String, Object> appConfigs() {
+                return null;
+            }
+
+            @Override
+            public Map<String, Object> appConfigsWithPrefix(String prefix) {
+                return null;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6b416f/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 2e2c221..dba82ca 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -189,6 +189,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return this.timestamp;
     }
 
+    @Override
+    public Map<String, Object> appConfigs() {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(String prefix) {
+        return null;
+    }
+
     public Map<String, StateStore> allStateStores() {
         return Collections.unmodifiableMap(storeMap);
     }


Mime
View raw message