kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6729: Follow up; disable logging for source KTable. (#5038)
Date Sun, 20 May 2018 17:24:17 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9752cca  KAFKA-6729: Follow up; disable logging for source KTable. (#5038)
9752cca is described below

commit 9752ccad552543964c2ba92a152cb67636233e13
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sun May 20 10:24:07 2018 -0700

    KAFKA-6729: Follow up; disable logging for source KTable. (#5038)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/streams/kstream/internals/InternalStreamsBuilder.java     | 3 +++
 .../kafka/streams/processor/internals/InternalTopologyBuilder.java  | 6 +++++-
 .../src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java  | 6 +++++-
 .../streams/processor/internals/StreamsPartitionAssignorTest.java   | 1 -
 4 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 480794c..0a19b4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,6 +72,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, KeyValueStore<Bytes,
byte[]>> materialized) {
+        // explicitly disable logging for source table materialized stores
+        materialized.withLoggingDisabled();
+
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
                 .materialize();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 575ac01..1651bbd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -120,7 +120,7 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    interface StateStoreFactory {
+    public interface StateStoreFactory {
         Set<String> users();
         boolean loggingEnabled();
         StateStore build();
@@ -1799,4 +1799,8 @@ public class InternalTopologyBuilder {
     public synchronized Set<String> getSourceTopicNames() {
         return sourceTopicNames;
     }
+
+    public synchronized Map<String, StateStoreFactory> getStateStores() {
+        return stateFactories;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0a1e6df..37101de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -279,7 +279,11 @@ public class StreamsBuilderTest {
 
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
 
-        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("topic")));
+        assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
+
+        assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(false));
+
+        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
equalTo(true));
     }
     
     @Test(expected = TopologyException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index cc507d6..37b03fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -796,7 +796,6 @@ public class StreamsPartitionAssignorTest {
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
4);
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog",
4);
         expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition",
4);
-        expectedCreatedInternalTopics.put("topic3", 4);     // the source topic is reused
as changelog topics
 
         // check if all internal topics were created as expected
         assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics));

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message