kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-5671 Followup: Remove reflections in unit test classes
Date Wed, 02 Aug 2017 22:13:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 630e9c567 -> 125d69cae


http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 4022a47..fad7116 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -47,7 +48,6 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -332,13 +332,9 @@ public class StandbyTaskTest {
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
         builder.stream(null, null, null, null, "topic").groupByKey().count("my-store");
 
-        // TODO: we should refactor this to avoid usage of reflection
-        final Field internalTopologyBuilderField = builder.getClass().getDeclaredField("internalTopologyBuilder");
-        internalTopologyBuilderField.setAccessible(true);
-        final InternalTopologyBuilder internalTopologyBuilder = (InternalTopologyBuilder)
internalTopologyBuilderField.get(builder);
-
+        final StreamsConfig config = createConfig(baseDir);
+        final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
         final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
-        StreamsConfig config = createConfig(baseDir);
 
         new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader,
config,
             new MockStreamsMetrics(new Metrics()), stateDirectory);

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 7b2956b..5fc3807 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -29,12 +29,12 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -940,7 +940,7 @@ public class StreamPartitionAssignorTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
         internalTopologyBuilder.setApplicationId(applicationId);
 
         KStream<Object, Object> stream1 = builder
@@ -1086,7 +1086,7 @@ public class StreamPartitionAssignorTest {
         final StreamsConfig config = new StreamsConfig(props);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
         internalTopologyBuilder.setApplicationId(applicationId);
 
         builder.stream("topic1").groupByKey().count("count");

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6417f57..465f8be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -104,11 +105,7 @@ public class StreamThreadTest {
     public void setUp() throws Exception {
         processId = UUID.randomUUID();
 
-        // TODO: we should refactor this to avoid usage of reflection
-        final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
-        internalTopologyBuilderField.setAccessible(true);
-        internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
-
+        internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(internalStreamsBuilder);
         internalTopologyBuilder.setApplicationId(applicationId);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index d394abe..ccba038 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.StreamsMetadata;
@@ -90,7 +90,7 @@ public class StreamsMetadataStateTest {
 
         builder.globalTable("global-topic", "global-table");
 
-        InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
+        StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
 
         topic1P0 = new TopicPartition("topic-one", 0);
         topic1P1 = new TopicPartition("topic-one", 1);
@@ -116,7 +116,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos,
Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder),
hostOne);
+        discovery = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
hostOne);
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
@@ -128,7 +128,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception {
-        new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder),
hostOne).getAllMetadataForStore("store");
+        new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store");
     }
 
     @Test
@@ -295,7 +295,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
     }
@@ -308,7 +308,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws
Exception {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 09200f2..474ef5c 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.junit.rules.ExternalResource;
 
 import java.io.File;
-import java.lang.reflect.Field;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -50,18 +49,22 @@ public class KStreamTestDriver extends ExternalResource {
     private MockProcessorContext context;
     private ProcessorTopology globalTopology;
 
+    @Deprecated
     public void setUp(final KStreamBuilder builder) {
         setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
+    @Deprecated
     public void setUp(final KStreamBuilder builder, final File stateDir) {
         setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
+    @Deprecated
     public void setUp(final KStreamBuilder builder, final File stateDir, final long cacheSize)
{
         setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
     }
 
+    @Deprecated
     public void setUp(final KStreamBuilder builder,
                       final File stateDir,
                       final Serde<?> keySerde,
@@ -69,6 +72,7 @@ public class KStreamTestDriver extends ExternalResource {
         setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
     }
 
+    @Deprecated
     public void setUp(final KStreamBuilder builder,
                       final File stateDir,
                       final Serde<?> keySerde,
@@ -112,26 +116,16 @@ public class KStreamTestDriver extends ExternalResource {
                       final Serde<?> keySerde,
                       final Serde<?> valSerde,
                       final long cacheSize) {
-        // TODO: we should refactor this to avoid usage of reflection
-        final InternalTopologyBuilder internalTopologyBuilder;
-        try {
-            final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
-            internalStreamsBuilderField.setAccessible(true);
-            final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder)
internalStreamsBuilderField.get(builder);
-
-            final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
-            internalTopologyBuilderField.setAccessible(true);
-            internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
-        } catch (final NoSuchFieldException | IllegalAccessException e) {
-            throw new RuntimeException(e);
-        }
+        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
 
         internalTopologyBuilder.setApplicationId("TestDriver");
         topology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
         final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new
Metrics()));
         context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(),
cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
         if (globalTopology != null) {
@@ -304,7 +298,7 @@ public class KStreamTestDriver extends ExternalResource {
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer) {
-        // The serialization is skipped.
+            // The serialization is skipped.
             if (sourceNodeByTopicName(topic) != null) {
                 process(topic, key, value);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d69ca/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 30ec90a..8bf8553 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.test;
 
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -50,17 +49,6 @@ public class StreamsTestUtils {
 
     }
 
-    /**
-     * Streams configuration with a random generated UUID for the application id
-     */
-    public static Properties getStreamsConfig(String bootstrapServer, String keySerdeClassName,
String valueSerdeClassName) {
-        return getStreamsConfig(UUID.randomUUID().toString(),
-                bootstrapServer,
-                keySerdeClassName,
-                valueSerdeClassName,
-                new Properties());
-    }
-
     public static Properties minimalStreamsConfig() {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
@@ -68,7 +56,6 @@ public class StreamsTestUtils {
         return properties;
     }
 
-
     public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K,
V>> iterator) {
         final List<KeyValue<K, V>> results = new ArrayList<>();
 
@@ -77,5 +64,4 @@ public class StreamsTestUtils {
         }
         return results;
     }
-
 }


Mime
View raw message