kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: remove unused MeteredKeyValueStore (#5380)
Date Wed, 18 Jul 2018 16:31:00 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06d9662  MINOR: remove unused MeteredKeyValueStore (#5380)
06d9662 is described below

commit 06d96628f0e098d93aecc650534c9e9965127d92
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Jul 18 09:30:52 2018 -0700

    MINOR: remove unused MeteredKeyValueStore (#5380)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../state/internals/InMemoryKeyValueStore.java     |  44 +--
 .../state/internals/InnerMeteredKeyValueStore.java | 339 ---------------------
 .../state/internals/KeyValueStoreBuilder.java      |  12 +-
 .../state/internals/MeteredKeyValueBytesStore.java | 151 ---------
 .../state/internals/MeteredKeyValueStore.java      | 315 +++++++++++++++----
 .../streams/state/internals/WrappedStateStore.java |   7 +-
 .../internals/KeyValueStoreMaterializerTest.java   |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java     |   3 +-
 .../state/internals/KeyValueStoreBuilderTest.java  |  23 +-
 ...toreTest.java => MeteredKeyValueStoreTest.java} |  22 +-
 10 files changed, 325 insertions(+), 595 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 9ea75a9..5a6b618 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -32,7 +31,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
-
 public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private final String name;
     private final Serde<K> keySerde;
@@ -42,7 +40,9 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     private StateSerdes<K, V> serdes;
 
-    public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
+    public InMemoryKeyValueStore(final String name,
+                                 final Serde<K> keySerde,
+                                 final Serde<V> valueSerde) {
         this.name = name;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
@@ -63,7 +63,8 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context, final StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         // construct the serde
         this.serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
@@ -72,15 +73,12 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
         if (root != null) {
             // register the store
-            context.register(root, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    // this is a delete
-                    if (value == null) {
-                        delete(serdes.keyFrom(key));
-                    } else {
-                        put(serdes.keyFrom(key), serdes.valueFrom(value));
-                    }
+            context.register(root, (key, value) -> {
+                // this is a delete
+                if (value == null) {
+                    delete(serdes.keyFrom(key));
+                } else {
+                    put(serdes.keyFrom(key), serdes.valueFrom(value));
                 }
             });
         }
@@ -104,7 +102,8 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized void put(final K key, final V value) {
+    public synchronized void put(final K key,
+                                 final V value) {
         if (value == null) {
             this.map.remove(key);
         } else {
@@ -113,8 +112,9 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized V putIfAbsent(final K key, final V value) {
-        V originalValue = get(key);
+    public synchronized V putIfAbsent(final K key,
+                                      final V value) {
+        final V originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
         }
@@ -123,8 +123,9 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public synchronized void putAll(final List<KeyValue<K, V>> entries) {
-        for (KeyValue<K, V> entry : entries)
+        for (final KeyValue<K, V> entry : entries) {
             put(entry.key, entry.value);
+        }
     }
 
     @Override
@@ -133,8 +134,11 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized KeyValueIterator<K, V> range(final K from, final K to) {
-        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+    public synchronized KeyValueIterator<K, V> range(final K from,
+                                                     final K to) {
+        return new DelegatingPeekingKeyValueIterator<>(
+            name,
+            new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
     }
 
     @Override
@@ -173,7 +177,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
         @Override
         public KeyValue<K, V> next() {
-            Map.Entry<K, V> entry = iter.next();
+            final Map.Entry<K, V> entry = iter.next();
             return new KeyValue<>(entry.getKey(), entry.getValue());
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
deleted file mode 100644
index 200b2d7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.List;
-
-/**
- * Metered {@link KeyValueStore} wrapper is used for recording operation metrics, and hence its
- * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
- *
- * @param <K>
- * @param <V>
- */
-class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
-
-    private final KeyValueStore<IK, IV> inner;
-    private final String metricScope;
-    // convert types from outer store type to inner store type
-    private final TypeConverter<K, IK, V, IV> typeConverter;
-    protected final Time time;
-    private Sensor putTime;
-    private Sensor putIfAbsentTime;
-    private Sensor getTime;
-    private Sensor deleteTime;
-    private Sensor putAllTime;
-    private Sensor allTime;
-    private Sensor rangeTime;
-    private Sensor flushTime;
-    private StreamsMetricsImpl metrics;
-    private ProcessorContext context;
-    private StateStore root;
-
-    /**
-     * For a period of time we will have 2 store hierarchies. 1 which is built by a
-     * {@link org.apache.kafka.streams.state.StoreSupplier} where the outer most store will be of user defined
-     * type, i.e, &lt;String,Integer&gt;, and another where the outermost store will be of type &lt;Bytes,byte[]&gt;
-     * This interface is so we don't need to have 2 complete implementations for collecting the metrics, rather
-     * we just provide an instance of this to do the type conversions from the outer store types to the inner store types.
-     * @param <K>  key type of the outer store
-     * @param <IK> key type of the inner store
-     * @param <V>  value type of the outer store
-     * @param <IV> value type of the inner store
-     */
-    interface TypeConverter<K, IK, V, IV> {
-        IK innerKey(final K key);
-        IV innerValue(final V value);
-        List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from);
-        V outerValue(final IV value);
-        KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);
-        K outerKey(final IK ik);
-    }
-
-    // always wrap the store with the metered store
-    InnerMeteredKeyValueStore(final KeyValueStore<IK, IV> inner,
-                              final String metricScope,
-                              final TypeConverter<K, IK, V, IV> typeConverter,
-                              final Time time) {
-        super(inner);
-        this.inner = inner;
-        this.metricScope = metricScope;
-        this.typeConverter = typeConverter;
-        this.time = time != null ? time : Time.SYSTEM;
-    }
-
-    @Override
-    public void init(ProcessorContext context, StateStore root) {
-        final String name = name();
-        final String tagKey = "task-id";
-        final String taskName = context.taskId().toString();
-        this.context = context;
-        this.root = root;
-        this.metrics = (StreamsMetricsImpl) context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                  metricScope,
-                                                                  name,
-                                                                  "put",
-                                                                  Sensor.RecordingLevel.DEBUG,
-                                                                  tagKey, taskName);
-        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                          metricScope,
-                                                                          name,
-                                                                          "put-if-absent",
-                                                                          Sensor.RecordingLevel.DEBUG,
-                                                                          tagKey, taskName);
-        this.getTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                  metricScope,
-                                                                  name,
-                                                                  "get",
-                                                                  Sensor.RecordingLevel.DEBUG,
-                                                                  tagKey, taskName);
-        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                     metricScope,
-                                                                     name,
-                                                                     "delete",
-                                                                     Sensor.RecordingLevel.DEBUG,
-                                                                     tagKey, taskName);
-        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                     metricScope,
-                                                                     name,
-                                                                     "put-all",
-                                                                     Sensor.RecordingLevel.DEBUG,
-                                                                     tagKey, taskName);
-        this.allTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                  metricScope,
-                                                                  name,
-                                                                  "all",
-                                                                  Sensor.RecordingLevel.DEBUG,
-                                                                  tagKey, taskName);
-        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                    metricScope,
-                                                                    name,
-                                                                    "range",
-                                                                    Sensor.RecordingLevel.DEBUG,
-                                                                    tagKey, taskName);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                    metricScope,
-                                                                    name,
-                                                                    "flush",
-                                                                    Sensor.RecordingLevel.DEBUG,
-                                                                    tagKey, taskName);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName,
-                                                                              metricScope,
-                                                                              name,
-                                                                              "restore",
-                                                                              Sensor.RecordingLevel.DEBUG,
-                                                                              tagKey, taskName);
-
-        // register and possibly restore the state from the logs
-        if (restoreTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.init(InnerMeteredKeyValueStore.this.context, InnerMeteredKeyValueStore.this.root);
-                    return null;
-                }
-            }, restoreTime);
-        } else {
-            inner.init(InnerMeteredKeyValueStore.this.context, InnerMeteredKeyValueStore.this.root);
-        }
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return inner.approximateNumEntries();
-    }
-
-    private interface Action<V> {
-        V execute();
-    }
-
-    @Override
-    public V get(final K key) {
-        try {
-            if (getTime.shouldRecord()) {
-                return measureLatency(new Action<V>() {
-                    @Override
-                    public V execute() {
-                        return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
-                    }
-                }, getTime);
-            } else {
-                return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
-            }
-        } catch (final ProcessorStateException e) {
-            final String message = String.format(e.getMessage(), key);
-            throw new ProcessorStateException(message, e);
-        }
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        try {
-            if (putTime.shouldRecord()) {
-                measureLatency(new Action<V>() {
-                    @Override
-                    public V execute() {
-                        inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
-                        return null;
-                    }
-                }, putTime);
-            } else {
-                inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
-            }
-        } catch (final ProcessorStateException e) {
-            final String message = String.format(e.getMessage(), key, value);
-            throw new ProcessorStateException(message, e);
-        }
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        if (putIfAbsentTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return typeConverter.outerValue(inner.putIfAbsent(typeConverter.innerKey(key), typeConverter.innerValue(value)));
-                }
-            }, putIfAbsentTime);
-        } else {
-            return typeConverter.outerValue(inner.putIfAbsent(typeConverter.innerKey(key), typeConverter.innerValue(value)));
-        }
-
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        if (putAllTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.putAll(typeConverter.innerEntries(entries));
-                    return null;
-                }
-            }, putAllTime);
-        } else {
-            inner.putAll(typeConverter.innerEntries(entries));
-        }
-    }
-
-    @Override
-    public V delete(final K key) {
-        try {
-            if (deleteTime.shouldRecord()) {
-                return measureLatency(new Action<V>() {
-                    @Override
-                    public V execute() {
-                        return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
-                    }
-                }, deleteTime);
-            } else {
-                return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
-            }
-        } catch (final ProcessorStateException e) {
-            final String message = String.format(e.getMessage(), key);
-            throw new ProcessorStateException(message, e);
-        }
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
-        return new MeteredKeyValueIterator(this.inner.range(typeConverter.innerKey(from), typeConverter.innerKey(to)), this.rangeTime);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
-    }
-
-    @Override
-    public void flush() {
-        if (flushTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.flush();
-                    return null;
-                }
-            }, flushTime);
-        } else {
-            inner.flush();
-        }
-
-    }
-
-    private V measureLatency(final Action<V> action, final Sensor sensor) {
-        final long startNs = time.nanoseconds();
-        try {
-            return action.execute();
-        } finally {
-            metrics.recordLatency(sensor, startNs, time.nanoseconds());
-        }
-    }
-
-    private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
-
-        private final KeyValueIterator<IK, IV> iter;
-        private final Sensor sensor;
-        private final long startNs;
-
-        MeteredKeyValueIterator(KeyValueIterator<IK, IV> iter, Sensor sensor) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.startNs = time.nanoseconds();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public KeyValue<K, V> next() {
-            return typeConverter.outerKeyValue(iter.next());
-        }
-
-        @Override
-        public void remove() {
-            iter.remove();
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
-            }
-        }
-
-        @Override
-        public K peekNextKey() {
-            return typeConverter.outerKey(iter.peekNextKey());
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
index 20230f9..c790ee9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -27,7 +27,6 @@ public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyVa
 
     private final KeyValueBytesStoreSupplier storeSupplier;
 
-
     public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
                                 final Serde<K> keySerde,
                                 final Serde<V> valueSerde,
@@ -39,11 +38,12 @@ public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyVa
 
     @Override
     public KeyValueStore<K, V> build() {
-        return new MeteredKeyValueBytesStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
-                                               storeSupplier.metricsScope(),
-                                               time,
-                                               keySerde,
-                                               valueSerde);
+        return new MeteredKeyValueStore<>(
+            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde);
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final KeyValueStore<Bytes, byte[]> inner) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
deleted file mode 100644
index 3a30c10..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
- * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
- * The inner {@link KeyValueStore} of this class is of type &lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
- * to convert from &lt;K,V&gt; to &lt;Bytes,byte[]&gt;
- * @param <K>
- * @param <V>
- */
-public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
-
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private StateSerdes<K, V> serdes;
-    private final InnerMeteredKeyValueStore<K, Bytes, V, byte[]> innerMetered;
-
-    // always wrap the store with the metered store
-    public MeteredKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner,
-                                     final String metricScope,
-                                     final Time time,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valueSerde) {
-        super(inner);
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        innerMetered = new InnerMeteredKeyValueStore<>(inner, metricScope, new InnerMeteredKeyValueStore.TypeConverter<K, Bytes, V, byte[]>() {
-            @Override
-            public Bytes innerKey(final K key) {
-                return Bytes.wrap(serdes.rawKey(key));
-            }
-
-            @Override
-            public byte[] innerValue(final V value) {
-                // do not check on null, but rely on user serde to handle it
-                return serdes.rawValue(value);
-            }
-
-            @Override
-            public List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
-                final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
-                for (KeyValue<K, V> entry : from) {
-                    byteEntries.add(KeyValue.pair(innerKey(entry.key), serdes.rawValue(entry.value)));
-
-                }
-                return byteEntries;
-            }
-
-            @Override
-            public V outerValue(final byte[] value) {
-                return value == null ? null : serdes.valueFrom(value);
-            }
-
-            @Override
-            public KeyValue<K, V> outerKeyValue(final KeyValue<Bytes, byte[]> keyValue) {
-                return KeyValue.pair(serdes.keyFrom(keyValue.key.get()), keyValue.value == null ? null : serdes.valueFrom(keyValue.value));
-            }
-
-            @Override
-            public K outerKey(final Bytes key) {
-                return serdes.keyFrom(key.get());
-            }
-        }, time);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(ProcessorContext context, StateStore root) {
-        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-        innerMetered.init(context, root);
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return innerMetered.approximateNumEntries();
-    }
-
-    @Override
-    public V get(final K key) {
-        return innerMetered.get(key);
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        innerMetered.put(key, value);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        return innerMetered.putIfAbsent(key, value);
-    }
-
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        innerMetered.putAll(entries);
-    }
-
-    @Override
-    public V delete(final K key) {
-        return innerMetered.delete(key);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
-        return innerMetered.range(from, to);
-    }
-
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return innerMetered.all();
-    }
-
-    @Override
-    public void flush() {
-        innerMetered.flush();
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 7d43165..fd79543 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -16,114 +16,327 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Metered {@link KeyValueStore} wrapper is used for recording operation metrics, and hence its
+ * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
  * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
- *
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
+ * to convert from &lt;K,V&gt; to &lt;Bytes,byte[]&gt;
  * @param <K>
  * @param <V>
  */
 public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
 
+    private final KeyValueStore<Bytes, byte[]> inner;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private StateSerdes<K, V> serdes;
+
+    private final String metricScope;
     protected final Time time;
-    private final InnerMeteredKeyValueStore<K, K, V, V> innerMetered;
+    private Sensor putTime;
+    private Sensor putIfAbsentTime;
+    private Sensor getTime;
+    private Sensor deleteTime;
+    private Sensor putAllTime;
+    private Sensor allTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private StreamsMetricsImpl metrics;
 
-    // always wrap the store with the metered store
-    public MeteredKeyValueStore(final KeyValueStore<K, V> inner,
-                                final String metricScope,
-                                final Time time) {
+    MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
+                         final String metricScope,
+                         final Time time,
+                         final Serde<K> keySerde,
+                         final Serde<V> valueSerde) {
         super(inner);
+        this.inner = inner;
+        this.metricScope = metricScope;
         this.time = time != null ? time : Time.SYSTEM;
-        this.innerMetered = new InnerMeteredKeyValueStore<>(inner, metricScope, new InnerMeteredKeyValueStore.TypeConverter<K, K, V, V>() {
-            @Override
-            public K innerKey(final K key) {
-                return key;
-            }
-
-            @Override
-            public V innerValue(final V value) {
-                return value;
-            }
-
-            @Override
-            public List<KeyValue<K, V>> innerEntries(final List<KeyValue<K, V>> from) {
-                return from;
-            }
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
 
-            @Override
-            public V outerValue(final V value) {
-                return value;
-            }
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        final String name = name();
+        final String tagKey = "task-id";
+        final String taskName = context.taskId().toString();
 
-            @Override
-            public KeyValue<K, V> outerKeyValue(final KeyValue<K, V> from) {
-                return from;
-            }
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
-            @Override
-            public K outerKey(final K key) {
-                return key;
-            }
-        }, time);
-    }
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "put",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "put-if-absent",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.getTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "get",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "delete",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "put-all",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.allTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "all",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "range",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "flush",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(
+            taskName,
+            metricScope,
+            name,
+            "restore",
+            Sensor.RecordingLevel.DEBUG,
+            tagKey, taskName);
 
-    @Override
-    public void init(ProcessorContext context, StateStore root) {
-        innerMetered.init(context, root);
+        // register and possibly restore the state from the logs
+        if (restoreTime.shouldRecord()) {
+            measureLatency(
+                () -> {
+                    inner.init(context, root);
+                    return null;
+                },
+                restoreTime);
+        } else {
+            inner.init(context, root);
+        }
     }
 
     @Override
     public long approximateNumEntries() {
-        return innerMetered.approximateNumEntries();
+        return inner.approximateNumEntries();
     }
 
     @Override
     public V get(final K key) {
-        return innerMetered.get(key);
+        try {
+            if (getTime.shouldRecord()) {
+                return measureLatency(() -> outerValue(inner.get(Bytes.wrap(serdes.rawKey(key)))), getTime);
+            } else {
+                return outerValue(inner.get(Bytes.wrap(serdes.rawKey(key))));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
     }
 
     @Override
-    public void put(final K key, final V value) {
-        innerMetered.put(key, value);
+    public void put(final K key,
+                    final V value) {
+        try {
+            if (putTime.shouldRecord()) {
+                measureLatency(() -> {
+                    inner.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value));
+                    return null;
+                }, putTime);
+            } else {
+                inner.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
+        }
     }
 
     @Override
-    public V putIfAbsent(final K key, final V value) {
-        return innerMetered.putIfAbsent(key, value);
+    public V putIfAbsent(final K key,
+                         final V value) {
+        if (putIfAbsentTime.shouldRecord()) {
+            return measureLatency(
+                () -> outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))),
+                putIfAbsentTime);
+        } else {
+            return outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)));
+        }
     }
 
     @Override
     public void putAll(final List<KeyValue<K, V>> entries) {
-        innerMetered.putAll(entries);
+        if (putAllTime.shouldRecord()) {
+            measureLatency(
+                () -> {
+                    inner.putAll(innerEntries(entries));
+                    return null;
+                },
+                putAllTime);
+        } else {
+            inner.putAll(innerEntries(entries));
+        }
     }
 
     @Override
     public V delete(final K key) {
-        return innerMetered.delete(key);
+        try {
+            if (deleteTime.shouldRecord()) {
+                return measureLatency(() -> outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key)))), deleteTime);
+            } else {
+                return outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key))));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
     }
 
     @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
-        return innerMetered.range(from, to);
+    public KeyValueIterator<K, V> range(final K from,
+                                        final K to) {
+        return new MeteredKeyValueIterator(
+            this.inner.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))),
+            this.rangeTime);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return innerMetered.all();
+        return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
     }
 
     @Override
     public void flush() {
-        innerMetered.flush();
+        if (flushTime.shouldRecord()) {
+            measureLatency(
+                () -> {
+                    inner.flush();
+                    return null;
+                },
+                flushTime);
+        } else {
+            inner.flush();
+        }
     }
 
+    private interface Action<V> {
+        V execute();
+    }
+
+    private V measureLatency(final Action<V> action,
+                             final Sensor sensor) {
+        final long startNs = time.nanoseconds();
+        try {
+            return action.execute();
+        } finally {
+            metrics.recordLatency(sensor, startNs, time.nanoseconds());
+        }
+    }
+
+    private V outerValue(final byte[] value) {
+        return value == null ? null : serdes.valueFrom(value);
+    }
+
+    private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
+        final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
+        for (final KeyValue<K, V> entry : from) {
+            byteEntries.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
+        }
+        return byteEntries;
+    }
+
+    private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]> iter,
+                                        final Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+            return KeyValue.pair(
+                serdes.keyFrom(keyValue.key.get()),
+                keyValue.value == null ? null : serdes.valueFrom(keyValue.value));
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+        @Override
+        public K peekNextKey() {
+            return serdes.keyFrom(iter.peekNextKey().get());
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 66d2899..38f966e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -34,19 +34,20 @@ public interface WrappedStateStore extends StateStore {
 
     /**
      * Return the state store this store directly wraps
-     * @return
+     * @return the state store this store directly wraps
      */
     StateStore wrappedStore();
 
     abstract class AbstractStateStore implements WrappedStateStore {
         final StateStore innerState;
 
-        AbstractStateStore(StateStore inner) {
+        AbstractStateStore(final StateStore inner) {
             this.innerState = inner;
         }
 
         @Override
-        public void init(ProcessorContext context, StateStore root) {
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
             innerState.init(context, root);
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index fc243c6..3cb6d42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
 import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -62,7 +62,7 @@ public class KeyValueStoreMaterializerTest {
         final KeyValueStore<String, String> store = builder.build();
         final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
         final StateStore logging = caching.wrappedStore();
-        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(caching, instanceOf(CachedStateStore.class));
         assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4b9e6a1..21049b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -54,7 +55,7 @@ import java.util.Set;
  * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
  * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
  * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
- * {@link org.apache.kafka.streams.state.internals.MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
+ * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
  *
  * <h2>Basic usage</h2>
  * This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
index 2d378d8..848bba6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
@@ -47,7 +47,7 @@ public class KeyValueStoreBuilderTest {
     private KeyValueStoreBuilder<String, String> builder;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         EasyMock.expect(supplier.get()).andReturn(inner);
         EasyMock.expect(supplier.name()).andReturn("name");
         EasyMock.replay(supplier);
@@ -62,13 +62,13 @@ public class KeyValueStoreBuilderTest {
     @Test
     public void shouldHaveMeteredStoreAsOuterStore() {
         final KeyValueStore<String, String> store = builder.build();
-        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredKeyValueStore.class));
     }
 
     @Test
     public void shouldHaveChangeLoggingStoreByDefault() {
         final KeyValueStore<String, String> store = builder.build();
-        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredKeyValueStore.class));
         final StateStore next = ((WrappedStateStore) store).wrappedStore();
         assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class));
     }
@@ -77,42 +77,43 @@ public class KeyValueStoreBuilderTest {
     public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
         final KeyValueStore<String, String> store = builder.withLoggingDisabled().build();
         final StateStore next = ((WrappedStateStore) store).wrappedStore();
-        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(next, CoreMatchers.equalTo(inner));
     }
 
     @Test
     public void shouldHaveCachingStoreWhenEnabled() {
         final KeyValueStore<String, String> store = builder.withCachingEnabled().build();
         final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
-        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
     }
 
     @Test
     public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
         final KeyValueStore<String, String> store = builder
-                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withLoggingEnabled(Collections.emptyMap())
                 .build();
         final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
-        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class));
-        assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.equalTo(inner));
     }
 
     @Test
     public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
         final KeyValueStore<String, String> store = builder
-                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withLoggingEnabled(Collections.emptyMap())
                 .withCachingEnabled()
                 .build();
         final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
         final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
-        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredKeyValueStore.class));
         assertThat(caching, instanceOf(CachingKeyValueStore.class));
         assertThat(changeLogging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
-        assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(changeLogging.wrappedStore(), CoreMatchers.equalTo(inner));
     }
 
+    @SuppressWarnings("all")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerIfInnerIsNull() {
         new KeyValueStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
similarity index 92%
rename from streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
rename to streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index a5e0d79..cd94c8c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -51,7 +51,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
-public class MeteredKeyValueBytesStoreTest {
+public class MeteredKeyValueStoreTest {
 
     private final TaskId taskId = new TaskId(0, 0);
     private final Map<String, String> tags = mkMap(
@@ -64,7 +64,7 @@ public class MeteredKeyValueBytesStoreTest {
     @Mock(type = MockType.NICE)
     private ProcessorContext context;
 
-    private MeteredKeyValueBytesStore<String, String> metered;
+    private MeteredKeyValueStore<String, String> metered;
     private final String key = "key";
     private final Bytes keyBytes = Bytes.wrap(key.getBytes());
     private final String value = "value";
@@ -73,7 +73,7 @@ public class MeteredKeyValueBytesStoreTest {
 
     @Before
     public void before() {
-        metered = new MeteredKeyValueBytesStore<>(
+        metered = new MeteredKeyValueStore<>(
             inner,
             "scope",
             new MockTime(),
@@ -102,7 +102,7 @@ public class MeteredKeyValueBytesStoreTest {
 
         final KafkaMetric metric = metric("put-rate");
 
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -115,7 +115,7 @@ public class MeteredKeyValueBytesStoreTest {
         assertThat(metered.get(key), equalTo(value));
 
         final KafkaMetric metric = metric("get-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -128,7 +128,7 @@ public class MeteredKeyValueBytesStoreTest {
         metered.putIfAbsent(key, value);
 
         final KafkaMetric metric = metric("put-if-absent-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -147,7 +147,7 @@ public class MeteredKeyValueBytesStoreTest {
         metered.putAll(Collections.singletonList(KeyValue.pair(key, value)));
 
         final KafkaMetric metric = metric("put-all-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -160,7 +160,7 @@ public class MeteredKeyValueBytesStoreTest {
         metered.delete(key);
 
         final KafkaMetric metric = metric("delete-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -177,7 +177,7 @@ public class MeteredKeyValueBytesStoreTest {
         iterator.close();
 
         final KafkaMetric metric = metric("range-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -194,7 +194,7 @@ public class MeteredKeyValueBytesStoreTest {
         iterator.close();
 
         final KafkaMetric metric = metric(new MetricName("all-rate", "stream-scope-metrics", "", tags));
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 
@@ -207,7 +207,7 @@ public class MeteredKeyValueBytesStoreTest {
         metered.flush();
 
         final KafkaMetric metric = metric("flush-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue((Double) metric.metricValue() > 0);
         EasyMock.verify(inner);
     }
 


Mime
View raw message