kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [1/3] kafka git commit: KAFKA-5754; Refactor Streams to use LogContext
Date Mon, 18 Sep 2017 08:53:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6055c7498 -> f305dd68f


http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index f861d8f..a32094c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -44,7 +45,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
     };
 
     private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  new MockStreamsMetrics(new
Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L,
 new MockStreamsMetrics(new Metrics()));
     private final String namespace = "0.0-one";
     private final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo",
Serdes.String(), Serdes.String());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 5b89ba3..5a4f952 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -101,7 +102,7 @@ public class MeteredWindowStoreTest {
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, streamsMetrics)) {
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)) {
 
             @Override
             public StreamsMetrics metrics() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 4368399..66cc9ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -42,7 +43,7 @@ import static org.junit.Assert.assertTrue;
 public class RocksDBKeyValueStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
-    private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new
Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new
MockStreamsMetrics(new Metrics()));
     private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                           Serdes.String(),
                                                                           Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 36d4c1f..9cb6ee5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -68,7 +69,7 @@ public class RocksDBSegmentedBytesStoreTest {
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         bytesStore.init(context, bytesStore);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index f62edf8..19bfded 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -45,7 +46,7 @@ public class RocksDBSessionStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private final List<ProducerRecord> logged = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new
Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new
MockStreamsMetrics(new Metrics()));
     private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
         Serdes.String(),
         Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 1a452a5..b25d725 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -62,7 +63,7 @@ public class RocksDBSessionStoreTest {
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
-                                           new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics())));
+                                           new ThreadCache(new LogContext("testCache "),
0, new MockStreamsMetrics(new Metrics())));
         sessionStore.init(context, sessionStore);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 2559954..1f15a03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -65,7 +66,7 @@ public class RocksDBStoreTest {
             Serdes.String(),
             Serdes.String(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
     }
 
     @After
@@ -113,7 +114,7 @@ public class RocksDBStoreTest {
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         tmpDir.setReadOnly();
 
         subject.openDB(tmpContext);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index 5e1f131..bca6949 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.WindowStore;
@@ -43,7 +44,7 @@ public class RocksDBWindowStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private WindowStore<String, String> store;
-    private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new
Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new
MockStreamsMetrics(new Metrics()));
     private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                           Serdes.String(),
                                                                           Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 6d13ca8..7736d9d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -72,10 +73,10 @@ public class RocksDBWindowStoreTest {
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("",
Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("TestCache", DEFAULT_CACHE_SIZE_BYTES,
new MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES,
new MockStreamsMetrics(new Metrics()));
 
     private final Producer<byte[], byte[]> producer = new MockProducer<>(true,
Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
-    private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask")
{
+    private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask",
new LogContext("RocksDBWindowStoreTestTask ")) {
         @Override
         public <K1, V1> void send(final String topic,
                                   K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 0091e45..9c150c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -57,7 +58,7 @@ public class SegmentIteratorTest {
                 Serdes.String(),
                 Serdes.String(),
                 new NoOpRecordCollector(),
-                new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+                new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new
Metrics())));
         segmentOne.openDB(context);
         segmentTwo.openDB(context);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index cdaa1e0..37e0d92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -46,7 +47,7 @@ public class SegmentsTest {
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
-                                           new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics())));
+                                           new ThreadCache(new LogContext("testCache "),
0, new MockStreamsMetrics(new Metrics())));
         segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index a9f7738..8749e92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -38,7 +39,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic,
Integer.class, String.class),
-            new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
+            new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest
")) {
                 @Override
                 public <K1, V1> void send(final String topic,
                                           final K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 38b021b..1368051 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -187,7 +188,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener()),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(),
new LogContext("test-stream-task ")),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index ea91c79..16fd34b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.junit.Test;
@@ -40,6 +41,7 @@ public class ThreadCacheTest {
     final String namespace = "0.0-namespace";
     final String namespace1 = "0.1-namespace";
     final String namespace2 = "0.2-namespace";
+    private final LogContext logContext = new LogContext("testCache ");
 
     @Test
     public void basicPutGet() throws IOException {
@@ -50,7 +52,7 @@ public class ThreadCacheTest {
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
-        ThreadCache cache = new ThreadCache("testCache",
+        ThreadCache cache = new ThreadCache(logContext,
                 toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(),
""),
             new MockStreamsMetrics(new Metrics()));
 
@@ -80,7 +82,7 @@ public class ThreadCacheTest {
         System.gc();
         long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory();
 
-        ThreadCache cache = new ThreadCache("testCache", desiredCacheSize, new MockStreamsMetrics(new
Metrics()));
+        ThreadCache cache = new ThreadCache(logContext, desiredCacheSize, new MockStreamsMetrics(new
Metrics()));
         long size = cache.sizeBytes();
         assertEquals(size, 0);
         for (int i = 0; i < numElements; i++) {
@@ -153,7 +155,7 @@ public class ThreadCacheTest {
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
-        ThreadCache cache = new ThreadCache("testCache",
+        ThreadCache cache = new ThreadCache(logContext,
                 memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
             new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
@@ -182,7 +184,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldDelete() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final Bytes key = Bytes.wrap(new byte[]{0});
 
         cache.put(namespace, key, dirtyEntry(key.get()));
@@ -193,7 +195,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotFlushAfterDelete() {
         final Bytes key = Bytes.wrap(new byte[]{0});
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
@@ -213,7 +215,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() {
         final Bytes key = Bytes.wrap(new byte[]{0});
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
 
         cache.put(namespace, key, dirtyEntry(key.get()));
         assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1})));
@@ -221,13 +223,13 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldNotClashWithOverlappingNames() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final Bytes nameByte = Bytes.wrap(new byte[]{0});
         final Bytes name1Byte = Bytes.wrap(new byte[]{1});
         cache.put(namespace1, nameByte, dirtyEntry(nameByte.get()));
@@ -239,7 +241,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldPeekNextKey() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final Bytes theByte = Bytes.wrap(new byte[]{0});
         cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
Bytes.wrap(new byte[]{1}));
@@ -249,7 +251,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldGetSameKeyAsPeekNext() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final Bytes theByte = Bytes.wrap(new byte[]{0});
         cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
Bytes.wrap(new byte[]{1}));
@@ -258,21 +260,21 @@ public class ThreadCacheTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowIfNoPeekNextKey() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new
byte[]{0}), Bytes.wrap(new byte[]{1}));
         iterator.peekNextKey();
     }
 
     @Test
     public void shouldReturnFalseIfNoNextKey() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new
byte[]{0}), Bytes.wrap(new byte[]{1}));
         assertFalse(iterator.hasNext());
     }
 
     @Test
     public void shouldPeekAndIterateOverRange() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new
Metrics()));
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
         for (final byte[] aByte : bytes) {
             cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
@@ -292,7 +294,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
-        final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new
Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -314,7 +316,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldFlushDirtyEntriesForNamespace() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new
Metrics()));
         final List<byte[]> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
@@ -336,7 +338,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotFlushCleanEntriesForNamespace() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new
Metrics()));
         final List<byte[]> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
@@ -376,20 +378,20 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() {
-        final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new
Metrics()));
         shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
     }
 
     @Test
     public void shouldEvictImmediatelyIfCacheSizeIsZero() {
-        final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new
Metrics()));
         shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
     }
 
     @Test
     public void shouldEvictAfterPutAll() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new
Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -406,7 +408,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldPutAll() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new
Metrics()));
 
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new
byte[]{5})),
                                            KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new
byte[]{6}))));
@@ -417,7 +419,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotForwardCleanEntryOnEviction() {
-        final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new
Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
@@ -430,7 +432,7 @@ public class ThreadCacheTest {
     }
     @Test
     public void shouldPutIfAbsent() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new
Metrics()));
         final Bytes key = Bytes.wrap(new byte[]{10});
         final byte[] value = {30};
         assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value)));
@@ -441,7 +443,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldEvictAfterPutIfAbsent() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new
Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -460,7 +462,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() {
         final int maxCacheSizeInBytes = 100;
-        final ThreadCache threadCache = new ThreadCache("testCache", maxCacheSizeInBytes,
new MockStreamsMetrics(new Metrics()));
+        final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes,
new MockStreamsMetrics(new Metrics()));
         // trigger a put into another cache on eviction from "name"
         threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
             @Override
@@ -493,7 +495,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldCleanupNamedCacheOnClose() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new
Metrics()));
         cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
         cache.put(namespace2, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
         assertEquals(cache.size(), 2);
@@ -504,7 +506,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldReturnNullIfKeyIsNull() {
-        final ThreadCache threadCache = new ThreadCache("testCache", 10, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache threadCache = new ThreadCache(logContext, 10, new MockStreamsMetrics(new
Metrics()));
         threadCache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
         assertNull(threadCache.get(namespace, null));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 474ef5c..7058f2f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -48,6 +49,7 @@ public class KStreamTestDriver extends ExternalResource {
     private ProcessorTopology topology;
     private MockProcessorContext context;
     private ProcessorTopology globalTopology;
+    private final LogContext logContext = new LogContext("testCache ");
 
     @Deprecated
     public void setUp(final KStreamBuilder builder) {
@@ -81,7 +83,7 @@ public class KStreamTestDriver extends ExternalResource {
         builder.setApplicationId("TestDriver");
         topology = builder.build(null);
         globalTopology = builder.buildGlobalStateTopology();
-        final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new
Metrics()));
         context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(),
cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
         // init global topology first as it will add stores to the
@@ -122,7 +124,7 @@ public class KStreamTestDriver extends ExternalResource {
         topology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
-        final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new
Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new
Metrics()));
         context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(),
cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
 
@@ -273,7 +275,7 @@ public class KStreamTestDriver extends ExternalResource {
 
     private class MockRecordCollector extends RecordCollectorImpl {
         MockRecordCollector() {
-            super(null, "KStreamTestDriver");
+            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 148511a..27e9309 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.InternalTopologyAccessor;
@@ -206,7 +207,7 @@ public class ProcessorTopologyTestDriver {
 
         final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(),
Time.SYSTEM);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);
+        final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics);
 
         if (globalTopology != null) {
             final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
@@ -235,7 +236,8 @@ public class ProcessorTopologyTestDriver {
                                   consumer,
                                   new StoreChangelogReader(
                                       createRestoreConsumer(topology.storeToChangelogTopic()),
-                                      stateRestoreListener),
+                                      stateRestoreListener,
+                                          new LogContext("topology-test-driver ")),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,


Mime
View raw message