kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] 01/02: KAFKA-9355: Fix bug that removed RocksDB metrics after failure in EOS (#7996)
Date Thu, 13 Feb 2020 23:02:19 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a19a15ffa07a26066b5ad560a29965c708de909c
Author: Bruno Cadonna <bruno@confluent.io>
AuthorDate: Wed Feb 12 02:31:13 2020 +0100

    KAFKA-9355: Fix bug that removed RocksDB metrics after failure in EOS (#7996)
    
    * Added init() method to RocksDBMetricsRecorder
    * Added call to init() of RocksDBMetricsRecorder to init() of RocksDB store
    * Added call to init() of RocksDBMetricsRecorder to openExisting() of segmented state
stores
    * Adapted unit tests
    * Added integration test that reproduces the situation in which the bug occurred
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/state/internals/KeyValueSegments.java  |   8 +-
 .../streams/state/internals/RocksDBStore.java      |   3 +-
 .../state/internals/TimestampedSegments.java       |   6 +
 .../internals/metrics/RocksDBMetricsRecorder.java  |  31 ++-
 .../integration/MetricsIntegrationTest.java        |  24 +-
 .../integration/RocksDBMetricsIntegrationTest.java | 295 +++++++++++++++++++++
 .../state/internals/KeyValueSegmentsTest.java      |   2 +
 .../streams/state/internals/RocksDBStoreTest.java  |   8 +-
 .../state/internals/SegmentIteratorTest.java       |   4 +-
 .../state/internals/TimestampedSegmentsTest.java   |   2 +
 .../metrics/RocksDBMetricsRecorderTest.java        | 145 ++++++----
 11 files changed, 427 insertions(+), 101 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index fc49c12..9dbbae4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -51,4 +51,10 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment>
{
             return newSegment;
         }
     }
-}
+
+    @Override
+    public void openExisting(final InternalProcessorContext context, final long streamTime)
{
+        metricsRecorder.init(context.metrics(), context.taskId());
+        super.openExisting(context, streamTime);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 96ffe3b..2b9b3f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -203,7 +203,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BulkLoadingSt
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics, (StreamsMetricsImpl) context.metrics(),
context.taskId());
+            metricsRecorder.addStatistics(name, statistics);
         }
     }
 
@@ -225,6 +225,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BulkLoadingSt
                      final StateStore root) {
         // open the DB dir
         internalProcessorContext = context;
+        metricsRecorder.init((StreamsMetricsImpl) context.metrics(), context.taskId());
         openDB(context);
         batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 400511f..e7c2edb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -51,4 +51,10 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment>
{
             return newSegment;
         }
     }
+
+    @Override
+    public void openExisting(final InternalProcessorContext context, final long streamTime)
{
+        metricsRecorder.init(context.metrics(), context.taskId());
+        super.openExisting(context, streamTime);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
index 59ade54..b5d603c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
@@ -51,7 +51,6 @@ public class RocksDBMetricsRecorder {
     private final String threadId;
     private TaskId taskId;
     private StreamsMetricsImpl streamsMetrics;
-    private boolean isInitialized = false;
 
     public RocksDBMetricsRecorder(final String metricsScope,
                                   final String threadId,
@@ -71,20 +70,26 @@ public class RocksDBMetricsRecorder {
         return taskId;
     }
 
-    public void addStatistics(final String segmentName,
-                              final Statistics statistics,
-                              final StreamsMetricsImpl streamsMetrics,
-                              final TaskId taskId) {
-        if (!isInitialized) {
-            initSensors(streamsMetrics, taskId);
-            this.taskId = taskId;
-            this.streamsMetrics = streamsMetrics;
-            isInitialized = true;
+    /**
+     * The initialisation of the metrics recorder is idempotent.
+     */
+    public void init(final StreamsMetricsImpl streamsMetrics,
+                     final TaskId taskId) {
+        if (this.taskId != null && !this.taskId.equals(taskId)) {
+            throw new IllegalStateException("Metrics recorder is re-initialised with different
task: previous task is " +
+                this.taskId + " whereas current task is " + taskId + ". This is a bug in
Kafka Streams.");
         }
-        if (this.taskId != taskId) {
-            throw new IllegalStateException("Statistics of store \"" + segmentName + "\"
for task " + taskId
-                + " cannot be added to metrics recorder for task " + this.taskId + ". This
is a bug in Kafka Streams.");
+        if (this.streamsMetrics != null && this.streamsMetrics != streamsMetrics)
{
+            throw new IllegalStateException("Metrics recorder is re-initialised with different
Streams metrics. "
+                + "This is a bug in Kafka Streams.");
         }
+        initSensors(streamsMetrics, taskId);
+        this.taskId = taskId;
+        this.streamsMetrics = streamsMetrics;
+    }
+
+    public void addStatistics(final String segmentName,
+                              final Statistics statistics) {
         if (statisticsToRecord.isEmpty()) {
             logger.debug(
                 "Adding metrics recorder of task {} to metrics recording trigger",
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 010f277..74bfb97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
+
 
 @SuppressWarnings("unchecked")
 @Category({IntegrationTest.class})
@@ -393,28 +393,6 @@ public class MetricsIntegrationTest {
         checkMetricsDeregistration();
     }
 
-    @Test
-    public void shouldNotAddRocksDBMetricsIfRecordingLevelIsInfo() throws Exception {
-        builder.table(
-            STREAM_INPUT,
-            Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()
-        ).toStream().to(STREAM_OUTPUT_1);
-        streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.name);
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
-        TestUtils.waitForCondition(
-            () -> kafkaStreams.state() == State.RUNNING,
-            timeout,
-            () -> "Kafka Streams application did not reach state RUNNING in " + timeout
+ " ms");
-
-        final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().equals("stream-state-metrics") &&
m.metricName().tags().containsKey("rocksdb-state-id"))
-            .collect(Collectors.toList());
-        assertTrue(listMetricStore.isEmpty());
-
-        closeApplication();
-    }
-
     private void verifyStateMetric(final State state) {
         final List<Metric> metricsList = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
             .filter(m -> m.metricName().name().equals(STATE) &&
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
new file mode 100644
index 0000000..b0ac1fa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class RocksDBMetricsIntegrationTest {
+
+    private static final int NUM_BROKERS = 3;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private static final String STREAM_INPUT = "STREAM_INPUT";
+    private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
+    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
+    private static final Duration WINDOW_SIZE = Duration.ofMillis(50);
+
+    // RocksDB metrics
+    private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
+    private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
+    private static final String BYTES_READ_RATE = "bytes-read-rate";
+    private static final String BYTES_READ_TOTAL = "bytes-read-total";
+    private static final String MEMTABLE_BYTES_FLUSHED_RATE = "memtable-bytes-flushed-rate";
+    private static final String MEMTABLE_BYTES_FLUSHED_TOTAL = "memtable-bytes-flushed-total";
+    private static final String MEMTABLE_HIT_RATIO = "memtable-hit-ratio";
+    private static final String WRITE_STALL_DURATION_AVG = "write-stall-duration-avg";
+    private static final String WRITE_STALL_DURATION_TOTAL = "write-stall-duration-total";
+    private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit-ratio";
+    private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit-ratio";
+    private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit-ratio";
+    private static final String BYTES_READ_DURING_COMPACTION_RATE = "bytes-read-compaction-rate";
+    private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
+    private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
+    private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
+
+    @Parameters(name = "{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {StreamsConfig.EXACTLY_ONCE},
+            {StreamsConfig.AT_LEAST_ONCE}
+        });
+    }
+
+    @Parameter
+    public String processingGuarantee;
+
+    @Before
+    public void before() throws Exception {
+        CLUSTER.createTopic(STREAM_INPUT, 1, 3);
+    }
+
+    @After
+    public void after() throws Exception {
+        CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
+    }
+
+    @Test
+    public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir()
throws Exception {
+        final Properties streamsConfiguration = streamsConfig();
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        final StreamsBuilder builder = builderForNonSegmentedStateStore();
+
+        cleanUpStateRunAndVerify(
+            builder,
+            streamsConfiguration,
+            IntegerDeserializer.class,
+            StringDeserializer.class,
+            "rocksdb-state-id"
+        );
+
+        cleanUpStateRunAndVerify(
+            builder,
+            streamsConfiguration,
+            IntegerDeserializer.class,
+            StringDeserializer.class,
+            "rocksdb-state-id"
+        );
+    }
+
+    @Test
+    public void shouldExposeRocksDBMetricsForSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir()
throws Exception {
+        final Properties streamsConfiguration = streamsConfig();
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        final StreamsBuilder builder = builderForSegmentedStateStore();
+
+        cleanUpStateRunAndVerify(
+            builder,
+            streamsConfiguration,
+            LongDeserializer.class,
+            LongDeserializer.class,
+            "rocksdb-window-state-id"
+        );
+
+        cleanUpStateRunAndVerify(
+            builder,
+            streamsConfiguration,
+            LongDeserializer.class,
+            LongDeserializer.class,
+            "rocksdb-window-state-id"
+        );
+    }
+
+    private Properties streamsConfig() {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024
* 1024L);
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        return streamsConfiguration;
+    }
+
+    private StreamsBuilder builderForNonSegmentedStateStore() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(
+            STREAM_INPUT,
+            Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()
+        ).toStream().to(STREAM_OUTPUT);
+        return builder;
+    }
+
+    private StreamsBuilder builderForSegmentedStateStore() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO))
+            .aggregate(() -> 0L,
+                (aggKey, newValue, aggValue) -> aggValue,
+                Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store")
+                    .withValueSerde(Serdes.Long())
+                    .withRetention(WINDOW_SIZE))
+            .toStream()
+            .map((key, value) -> KeyValue.pair(value, value))
+            .to(STREAM_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long()));
+        return builder;
+    }
+
+    private void cleanUpStateRunAndVerify(final StreamsBuilder builder,
+                                          final Properties streamsConfiguration,
+                                          final Class outputKeyDeserializer,
+                                          final Class outputValueDeserializer,
+                                          final String metricsScope) throws Exception {
+        final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+        kafkaStreams.cleanUp();
+        produceRecords();
+
+        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, 60000);
+
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                "consumerApp",
+                outputKeyDeserializer,
+                outputValueDeserializer,
+                new Properties()
+            ),
+            STREAM_OUTPUT,
+            1
+        );
+        verifyRocksDBMetrics(kafkaStreams, metricsScope);
+        kafkaStreams.close();
+    }
+
+    private void produceRecords() throws Exception {
+        final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            STREAM_INPUT,
+            Collections.singletonList(new KeyValue<>(1, "A")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()
+            ),
+            mockTime.milliseconds()
+        );
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            STREAM_INPUT,
+            Collections.singletonList(new KeyValue<>(1, "B")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()
+            ),
+            mockTime.milliseconds()
+        );
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            STREAM_INPUT,
+            Collections.singletonList(new KeyValue<>(1, "C")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()
+            ),
+            mockTime.milliseconds()
+        );
+    }
+
+    private void verifyRocksDBMetrics(final KafkaStreams kafkaStreams, final String metricsScope)
{
+        final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals("stream-state-metrics") &&
m.metricName().tags().containsKey(metricsScope))
+            .collect(Collectors.toList());
+        checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, 1);
+        checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, 1);
+        checkMetricByName(listMetricStore, BYTES_READ_RATE, 1);
+        checkMetricByName(listMetricStore, BYTES_READ_TOTAL, 1);
+        checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_RATE, 1);
+        checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_TOTAL, 1);
+        checkMetricByName(listMetricStore, MEMTABLE_HIT_RATIO, 1);
+        checkMetricByName(listMetricStore, WRITE_STALL_DURATION_AVG, 1);
+        checkMetricByName(listMetricStore, WRITE_STALL_DURATION_TOTAL, 1);
+        checkMetricByName(listMetricStore, BLOCK_CACHE_DATA_HIT_RATIO, 1);
+        checkMetricByName(listMetricStore, BLOCK_CACHE_INDEX_HIT_RATIO, 1);
+        checkMetricByName(listMetricStore, BLOCK_CACHE_FILTER_HIT_RATIO, 1);
+        checkMetricByName(listMetricStore, BYTES_READ_DURING_COMPACTION_RATE, 1);
+        checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
+    }
+
+    private void checkMetricByName(final List<Metric> listMetric, final String metricName,
final int numMetric) {
+        final List<Metric> metrics = listMetric.stream()
+            .filter(m -> m.metricName().name().equals(metricName))
+            .collect(Collectors.toList());
+        Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to
" + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
+        for (final Metric m : metrics) {
+            Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index ddf6175..b052ecb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -63,6 +63,7 @@ public class KeyValueSegmentsTest {
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
         );
         segments = new KeyValueSegments(storeName, METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
+        segments.openExisting(context, -1L);
     }
 
     @After
@@ -154,6 +155,7 @@ public class KeyValueSegmentsTest {
     @Test
     public void shouldOpenExistingSegments() {
         segments = new KeyValueSegments("test",  METRICS_SCOPE, 4, 1);
+        segments.openExisting(context, -1L);
         segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.getOrCreateSegmentIfLive(1, context, -1L);
         segments.getOrCreateSegmentIfLive(2, context, -1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 0a28a77..36d53ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -98,12 +98,12 @@ public class RocksDBStoreTest {
     public void setUp() {
         final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
-        rocksDBStore = getRocksDBStore();
         dir = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext(dir,
             Serdes.String(),
             Serdes.String(),
             new StreamsConfig(props));
+        rocksDBStore = getRocksDBStore();
         context.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
     }
 
@@ -150,9 +150,7 @@ public class RocksDBStoreTest {
         reset(metricsRecorder);
         metricsRecorder.addStatistics(
             eq(DB_NAME),
-            anyObject(Statistics.class),
-            eq(mockContext.metrics()),
-            eq(mockContext.taskId())
+            anyObject(Statistics.class)
         );
         replay(metricsRecorder);
 
@@ -286,7 +284,7 @@ public class RocksDBStoreTest {
     public void shouldCallRocksDbConfigSetter() {
         MockRocksDbConfigSetter.called = false;
 
-        rocksDBStore.openDB(context);
+        rocksDBStore.init(context, rocksDBStore);
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 7944e5f..437c556 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -62,8 +62,8 @@ public class SegmentIteratorTest {
                     new LogContext("testCache "),
                     0,
                     new MockStreamsMetrics(new Metrics())));
-        segmentOne.openDB(context);
-        segmentTwo.openDB(context);
+        segmentOne.init(context, segmentOne);
+        segmentTwo.init(context, segmentTwo);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
         segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
         segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index 8d294ce..ce4652d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -63,6 +63,7 @@ public class TimestampedSegmentsTest {
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
         );
         segments = new TimestampedSegments(storeName, METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
+        segments.openExisting(context, -1L);
     }
 
     @After
@@ -155,6 +156,7 @@ public class TimestampedSegmentsTest {
     @Test
     public void shouldOpenExistingSegments() {
         segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1);
+        segments.openExisting(context, -1L);
         segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.getOrCreateSegmentIfLive(1, context, -1L);
         segments.getOrCreateSegmentIfLive(2, context, -1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index f22c02e..fc60c27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals.metrics;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
@@ -35,11 +37,12 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.niceMock;
 import static org.easymock.EasyMock.resetToNice;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.powermock.api.easymock.PowerMock.reset;
 import static org.powermock.api.easymock.PowerMock.createMock;
 import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.mockStaticNice;
 import static org.powermock.api.easymock.PowerMock.replay;
 import static org.powermock.api.easymock.PowerMock.verify;
 
@@ -71,106 +74,110 @@ public class RocksDBMetricsRecorderTest {
     private final StreamsMetricsImpl streamsMetrics = niceMock(StreamsMetricsImpl.class);
     private final RocksDBMetricsRecordingTrigger recordingTrigger = mock(RocksDBMetricsRecordingTrigger.class);
     private final TaskId taskId1 = new TaskId(0, 0);
-    private final TaskId taskId2 = new TaskId(0, 2);
+    private final TaskId taskId2 = new TaskId(0, 1);
 
     private final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE,
THREAD_ID, STORE_NAME);
 
     @Before
     public void setUp() {
+        setUpMetricsStubMock();
         expect(streamsMetrics.rocksDBMetricsRecordingTrigger()).andStubReturn(recordingTrigger);
         replay(streamsMetrics);
+        recorder.init(streamsMetrics, taskId1);
     }
 
     @Test
-    public void shouldSetStatsLevelToExceptDetailedTimersWhenStatisticsIsAdded() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        statisticsToAdd1.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
-        replay(statisticsToAdd1);
+    public void shouldInitMetricsRecorder() {
+        setUpMetricsMock();
 
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+        recorder.init(streamsMetrics, taskId1);
 
-        verify(statisticsToAdd1);
+        verify(RocksDBMetrics.class);
+        assertThat(recorder.taskId(), is(taskId1));
     }
 
     @Test
-    public void shouldThrowIfTaskIdOfStatisticsToAddDiffersFromInitialisedOne() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+    public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() {
+        setUpMetricsStubMock();
+        recorder.init(streamsMetrics, taskId1);
+
         assertThrows(
             IllegalStateException.class,
-            () -> recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics,
taskId2)
+            () -> recorder.init(streamsMetrics, taskId2)
         );
     }
 
     @Test
-    public void shouldThrowIfStatisticsToAddHasBeenAlreadyAdded() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+    public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetrics() {
+        setUpMetricsStubMock();
+        recorder.init(streamsMetrics, taskId1);
 
         assertThrows(
             IllegalStateException.class,
-            () -> recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics,
taskId1)
+            () -> recorder.init(
+                new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST),
+                taskId1
+            )
         );
     }
 
     @Test
-    public void shouldInitMetricsAndAddItselfToRecordingTriggerOnlyWhenFirstStatisticsIsAdded()
{
-        setUpMetricsMock();
-        recordingTrigger.addMetricsRecorder(recorder);
-        replay(recordingTrigger);
+    public void shouldSetStatsLevelToExceptDetailedTimersWhenStatisticsIsAdded() {
+        statisticsToAdd1.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
+        replay(statisticsToAdd1);
 
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
 
-        verify(recordingTrigger);
-        verify(RocksDBMetrics.class);
+        verify(statisticsToAdd1);
+    }
 
-        mockStatic(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        reset(recordingTrigger);
+    @Test
+    public void shouldThrowIfStatisticsToAddHasBeenAlreadyAdded() {
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+
+        assertThrows(
+            IllegalStateException.class,
+            () -> recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1)
+        );
+    }
+
+    @Test
+    public void shouldAddItselfToRecordingTriggerWhenFirstStatisticsIsAddedToNewlyCreatedRecorder()
{
+        recordingTrigger.addMetricsRecorder(recorder);
         replay(recordingTrigger);
 
-        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
 
         verify(recordingTrigger);
-        verify(RocksDBMetrics.class);
     }
 
     @Test
-    public void shouldAddItselfToRecordingTriggerWhenEmptyButInitialised() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+    public void shouldAddItselfToRecordingTriggerWhenFirstStatisticsIsAddedAfterLastStatisticsWasRemoved()
{
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
         recorder.removeStatistics(SEGMENT_STORE_NAME_1);
         reset(recordingTrigger);
         recordingTrigger.addMetricsRecorder(recorder);
         replay(recordingTrigger);
 
-        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
 
         verify(recordingTrigger);
     }
 
     @Test
     public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
         reset(recordingTrigger);
         replay(recordingTrigger);
 
-        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
 
         verify(recordingTrigger);
     }
 
     @Test
     public void shouldCloseStatisticsWhenStatisticsIsRemoved() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
         reset(statisticsToAdd1);
         statisticsToAdd1.close();
         replay(statisticsToAdd1);
@@ -182,10 +189,8 @@ public class RocksDBMetricsRecorderTest {
 
     @Test
     public void shouldRemoveItselfFromRecordingTriggerWhenLastStatisticsIsRemoved() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
-        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
         reset(recordingTrigger);
         replay(recordingTrigger);
 
@@ -204,9 +209,8 @@ public class RocksDBMetricsRecorderTest {
 
     @Test
     public void shouldThrowIfStatisticsToRemoveNotFound() {
-        mockStaticNice(RocksDBMetrics.class);
-        replay(RocksDBMetrics.class);
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+
         assertThrows(
             IllegalStateException.class,
             () -> recorder.removeStatistics(SEGMENT_STORE_NAME_2)
@@ -215,9 +219,8 @@ public class RocksDBMetricsRecorderTest {
 
     @Test
     public void shouldRecordMetrics() {
-        setUpMetricsMock();
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
-        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+        recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
         reset(statisticsToAdd1);
         reset(statisticsToAdd2);
 
@@ -303,10 +306,9 @@ public class RocksDBMetricsRecorderTest {
 
     @Test
     public void shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() {
-        setUpMetricsMock();
-        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
         resetToNice(statisticsToAdd1);
-        expect(statisticsToAdd1.getTickerCount(anyObject())).andReturn(0L).anyTimes();
+        recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+        expect(statisticsToAdd1.getTickerCount(anyObject())).andStubReturn(0L);
         replay(statisticsToAdd1);
         memtableHitRatioSensor.record(0);
         blockCacheDataHitRatioSensor.record(0);
@@ -355,4 +357,35 @@ public class RocksDBMetricsRecorderTest {
             .andReturn(numberOfFileErrorsSensor);
         replay(RocksDBMetrics.class);
     }
+
+    private void setUpMetricsStubMock() {
+        mockStatic(RocksDBMetrics.class);
+        final RocksDBMetricContext metricsContext =
+            new RocksDBMetricContext(THREAD_ID, taskId1.toString(), METRICS_SCOPE, STORE_NAME);
+        expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
+            .andStubReturn(bytesWrittenToDatabaseSensor);
+        expect(RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext))
+            .andStubReturn(bytesReadFromDatabaseSensor);
+        expect(RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricsContext))
+            .andStubReturn(memtableBytesFlushedSensor);
+        expect(RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricsContext))
+            .andStubReturn(memtableHitRatioSensor);
+        expect(RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricsContext))
+            .andStubReturn(writeStallDurationSensor);
+        expect(RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricsContext))
+            .andStubReturn(blockCacheDataHitRatioSensor);
+        expect(RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricsContext))
+            .andStubReturn(blockCacheIndexHitRatioSensor);
+        expect(RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricsContext))
+            .andStubReturn(blockCacheFilterHitRatioSensor);
+        expect(RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricsContext))
+            .andStubReturn(bytesWrittenDuringCompactionSensor);
+        expect(RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricsContext))
+            .andStubReturn(bytesReadDuringCompactionSensor);
+        expect(RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricsContext))
+            .andStubReturn(numberOfOpenFilesSensor);
+        expect(RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
+            .andStubReturn(numberOfFileErrorsSensor);
+        replay(RocksDBMetrics.class);
+    }
 }
\ No newline at end of file


Mime
View raw message