kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: fixes lgtm.com warnings (#4582)
Date Sun, 25 Feb 2018 20:26:21 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5df535e  MINOR: fixes lgtm.com warnings (#4582)
5df535e is described below

commit 5df535e8a349771942050f1e3fd58851f413fa3a
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Sun Feb 25 12:26:18 2018 -0800

    MINOR: fixes lgtm.com warnings (#4582)
    
    fixes lgmt.com warnings
    cleanup PrintForeachAction and Printed
    
    Author: Matthias J. Sax <matthias@confluent.io>
    
    Reviewers: Sebastian Bauersfeld <sebastianbauersfeld@gmx.de>, Damian Guy <damian@confluent.io>,
Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../org/apache/kafka/common/cache/LRUCache.java    |  2 +-
 .../main/scala/kafka/tools/StreamsResetter.java    | 30 +++++++++-----------
 .../org/apache/kafka/streams/kstream/Printed.java  | 19 ++++++-------
 .../streams/kstream/internals/KTableImpl.java      | 19 +++++++------
 .../kstream/internals/PrintForeachAction.java      | 32 +++++++++++-----------
 .../streams/kstream/internals/PrintedInternal.java |  2 +-
 .../processor/internals/InternalTopicManager.java  |  2 +-
 .../internals/StreamPartitionAssignor.java         |  4 +--
 .../streams/processor/internals/StreamThread.java  | 12 ++++----
 .../apache/kafka/streams/kstream/PrintedTest.java  | 10 +++++--
 .../kstream/internals/KStreamPrintTest.java        | 16 ++---------
 11 files changed, 70 insertions(+), 78 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
index bdc67ac..672cb65 100644
--- a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
@@ -29,7 +29,7 @@ public class LRUCache<K, V> implements Cache<K, V> {
         cache = new LinkedHashMap<K, V>(16, .75f, true) {
             @Override
             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                return size() > maxSize;
+                return this.size() > maxSize; // require this. prefix to make lgtm.com
happy
             }
         };
     }
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 31c69ee..f88fce7 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -386,7 +386,7 @@ public class StreamsResetter {
                 shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
             } else if (options.has(toDatetimeOption)) {
                 final String ts = options.valueOf(toDatetimeOption);
-                final Long timestamp = getDateTime(ts);
+                final long timestamp = getDateTime(ts);
                 resetToDatetime(client, inputTopicPartitions, timestamp);
             } else if (options.has(byDurationOption)) {
                 final String duration = options.valueOf(byDurationOption);
@@ -401,8 +401,7 @@ public class StreamsResetter {
             }
 
             for (final TopicPartition p : inputTopicPartitions) {
-                final Long position = client.position(p);
-                System.out.println("Topic: " + p.topic() + " Partition: " + p.partition()
+ " Offset: " + position);
+                System.out.println("Topic: " + p.topic() + " Partition: " + p.partition()
+ " Offset: " + client.position(p));
             }
         }
     }
@@ -416,8 +415,7 @@ public class StreamsResetter {
             checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            final Long offset = validatedTopicPartitionsAndOffset.get(topicPartition);
-            client.seek(topicPartition, offset);
+            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
         }
     }
 
@@ -429,7 +427,7 @@ public class StreamsResetter {
     private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition>
inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
         final Date now = new Date();
         duration.negate().addTo(now);
-        final Long timestamp = now.getTime();
+        final long timestamp = now.getTime();
 
         final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
         for (final TopicPartition topicPartition : inputTopicPartitions) {
@@ -439,8 +437,7 @@ public class StreamsResetter {
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
-            client.seek(topicPartition, offset);
+            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
         }
     }
 
@@ -453,20 +450,19 @@ public class StreamsResetter {
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
-            client.seek(topicPartition, offset);
+            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
         }
     }
 
     // visible for testing
-    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition>
inputTopicPartitions, Long shiftBy) {
+    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition>
inputTopicPartitions, long shiftBy) {
         final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
         final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
 
         final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            final Long position = client.position(topicPartition);
-            final Long offset = position + shiftBy;
+            final long position = client.position(topicPartition);
+            final long offset = position + shiftBy;
             topicPartitionsAndOffset.put(topicPartition, offset);
         }
 
@@ -497,7 +493,7 @@ public class StreamsResetter {
     }
 
     // visible for testing
-    public Long getDateTime(String timestamp) throws ParseException {
+    public long getDateTime(String timestamp) throws ParseException {
         final String[] timestampParts = timestamp.split("T");
         if (timestampParts.length < 2) {
             throw new ParseException("Error parsing timestamp. It does not contain a 'T'
according to ISO8601 format", timestamp.length());
@@ -549,10 +545,10 @@ public class StreamsResetter {
                                                        final Map<TopicPartition, Long>
endOffsets) {
         final Map<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<>();
         for (final Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet())
{
-            final Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
-            final Long offset = topicPartitionAndOffset.getValue();
+            final long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
+            final long offset = topicPartitionAndOffset.getValue();
             if (offset < endOffset) {
-                final Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
+                final long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
                 if (offset > beginningOffset) {
                     validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(),
offset);
                 } else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index 8d2c22a..5a1d07f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -19,9 +19,8 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.errors.TopologyException;
 
 import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
 import java.util.Objects;
 
 /**
@@ -32,7 +31,7 @@ import java.util.Objects;
  * @see KStream#print(Printed)
  */
 public class Printed<K, V> {
-    protected final PrintWriter printWriter;
+    protected final OutputStream outputStream;
     protected String label;
     protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K,
V, String>() {
         @Override
@@ -41,8 +40,8 @@ public class Printed<K, V> {
         }
     };
 
-    private Printed(final PrintWriter printWriter) {
-        this.printWriter = printWriter;
+    private Printed(final OutputStream outputStream) {
+        this.outputStream = outputStream;
     }
 
     /**
@@ -50,7 +49,7 @@ public class Printed<K, V> {
      * @param printed   instance of {@link Printed} to copy
      */
     protected Printed(final Printed<K, V> printed) {
-        this.printWriter = printed.printWriter;
+        this.outputStream = printed.outputStream;
         this.label = printed.label;
         this.mapper = printed.mapper;
     }
@@ -69,8 +68,8 @@ public class Printed<K, V> {
             throw new TopologyException("filePath can't be an empty string");
         }
         try {
-            return new Printed<>(new PrintWriter(filePath, StandardCharsets.UTF_8.name()));
-        } catch (final FileNotFoundException | UnsupportedEncodingException e) {
+            return new Printed<>(new FileOutputStream(filePath));
+        } catch (final FileNotFoundException e) {
             throw new TopologyException("Unable to write stream to file at [" + filePath
+ "] " + e.getMessage());
         }
     }
@@ -83,7 +82,7 @@ public class Printed<K, V> {
      * @return a new Printed instance
      */
     public static <K, V> Printed<K, V> toSysOut() {
-        return new Printed<>((PrintWriter) null);
+        return new Printed<>(System.out);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a746d31..11b8c51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -39,9 +39,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
+import java.io.FileOutputStream;
 import java.util.Objects;
 import java.util.Set;
 
@@ -346,7 +344,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
                       final String label) {
         Objects.requireNonNull(label, "label can't be null");
         final String name = builder.newProcessorName(PRINTING_NAME);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null,
defaultKeyValueMapper, label)), this.name);
+        builder.internalTopologyBuilder.addProcessor(
+            name,
+            new KStreamPrint<>(new PrintForeachAction<>(System.out, defaultKeyValueMapper,
label)),
+            this.name);
     }
 
     @SuppressWarnings("deprecation")
@@ -384,11 +385,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         if (filePath.trim().isEmpty()) {
             throw new TopologyException("filePath can't be an empty string");
         }
-        String name = builder.newProcessorName(PRINTING_NAME);
+        final String name = builder.newProcessorName(PRINTING_NAME);
         try {
-            PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new
PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
-        } catch (final FileNotFoundException | UnsupportedEncodingException e) {
+            builder.internalTopologyBuilder.addProcessor(
+                name,
+                new KStreamPrint<>(new PrintForeachAction<>(new FileOutputStream(filePath),
defaultKeyValueMapper, label)),
+                this.name);
+        } catch (final FileNotFoundException e) {
             throw new TopologyException(String.format("Unable to write stream to file at
[%s] %s", filePath, e.getMessage()));
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
index dcdd44f..174319f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
@@ -19,26 +19,30 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
 
 public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
 
     private final String label;
     private final PrintWriter printWriter;
+    private final boolean closable;
     private final KeyValueMapper<? super K, ? super V, String> mapper;
+
     /**
-     * Print customized output with given writer. The PrintWriter can be null in order to
-     * distinguish between {@code System.out} and the others. If the PrintWriter is {@code
PrintWriter(System.out)},
-     * then it would close {@code System.out} output stream.
-     * <p>
-     * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead.
+     * Print customized output with given writer. The {@link OutputStream} can be {@link
System#out} or the others.
      *
-     * @param printWriter Use {@code System.out.println} if {@code null}.
+     * @param outputStream The output stream to write to.
      * @param mapper The mapper which can allow user to customize output will be printed.
      * @param label The given name will be printed.
      */
-    public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super
K, ? super V, String> mapper, final String label) {
-        this.printWriter = printWriter;
+    PrintForeachAction(final OutputStream outputStream,
+                       final KeyValueMapper<? super K, ? super V, String> mapper,
+                       final String label) {
+        this.printWriter = new PrintWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
+        this.closable = outputStream != System.out && outputStream != System.err;
         this.mapper = mapper;
         this.label = label;
     }
@@ -46,18 +50,14 @@ public class PrintForeachAction<K, V> implements ForeachAction<K,
V> {
     @Override
     public void apply(final K key, final V value) {
         final String data = String.format("[%s]: %s", label, mapper.apply(key, value));
-        if (printWriter == null) {
-            System.out.println(data);
-        } else {
-            printWriter.println(data);
-        }
+        printWriter.println(data);
     }
 
     public void close() {
-        if (printWriter == null) {
-            System.out.flush();
-        } else {
+        if (closable) {
             printWriter.close();
+        } else {
+            printWriter.flush();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index 7e1a02d..45e2513 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -31,6 +31,6 @@ public class PrintedInternal<K, V> extends Printed<K, V> {
      * @return the {@code ProcessorSupplier} to be used for printing
      */
     public ProcessorSupplier<K, V> build(final String processorName) {
-        return new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper,
label != null ? label : processorName));
+        return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper,
label != null ? label : processorName));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 05d079b..aeff946 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -221,7 +221,7 @@ public class InternalTopicManager {
                                                              final Map<String, Integer>
existingTopicNamesPartitions) {
         final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>();
         for (final InternalTopicConfig topic : topicsPartitionsMap) {
-            final Integer numberOfPartitions = topic.numberOfPartitions();
+            final int numberOfPartitions = topic.numberOfPartitions();
             if (existingTopicNamesPartitions.containsKey(topic.name())) {
                 if (!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions))
{
                     final String errorMsg = String.format("Existing internal topic %s has
invalid partitions: " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 2a08308..2a26272 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -377,7 +377,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new
HashMap<>();
         for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet())
{
             final String topic = entry.getKey();
-            final Integer numPartitions = entry.getValue().numPartitions;
+            final int numPartitions = entry.getValue().numPartitions;
 
             for (int partition = 0; partition < numPartitions; partition++) {
                 allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
@@ -638,7 +638,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         for (final InternalTopicMetadata metadata : topicPartitions.values()) {
             final InternalTopicConfig topic = metadata.config;
-            final Integer numPartitions = metadata.numPartitions;
+            final int numPartitions = metadata.numPartitions;
 
             if (numPartitions == NOT_AVAILABLE) {
                 continue;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5e25d02..61a22be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -894,15 +894,13 @@ public class StreamThread extends Thread {
      * @param records Records, can be null
      */
     private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
-        if (records != null && !records.isEmpty()) {
-            int numAddedRecords = 0;
+        int numAddedRecords = 0;
 
-            for (final TopicPartition partition : records.partitions()) {
-                final StreamTask task = taskManager.activeTask(partition);
-                numAddedRecords += task.addRecords(partition, records.records(partition));
-            }
-            streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords,
timerStartedMs);
+        for (final TopicPartition partition : records.partitions()) {
+            final StreamTask task = taskManager.activeTask(partition);
+            numAddedRecords += task.addRecords(partition, records.records(partition));
         }
+        streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
index adec9ff..a50fce9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -41,11 +41,12 @@ public class PrintedTest {
 
     private final PrintStream originalSysOut = System.out;
     private final ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
-    private final Printed<String, Integer> sysOutPrinter = Printed.toSysOut();
+    private Printed<String, Integer> sysOutPrinter;
 
     @Before
     public void before() {
         System.setOut(new PrintStream(sysOut));
+        sysOutPrinter = Printed.toSysOut();
     }
 
     @After
@@ -72,7 +73,10 @@ public class PrintedTest {
     @Test
     public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException
{
         final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
-        supplier.get().process("good", 2);
+        final Processor<String, Integer> processor = supplier.get();
+
+        processor.process("good", 2);
+        processor.close();
         assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]:
good, 2\n"));
     }
 
@@ -83,6 +87,7 @@ public class PrintedTest {
                 .get();
 
         processor.process("hello", 3);
+        processor.close();
         assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello,
3\n"));
     }
 
@@ -97,6 +102,7 @@ public class PrintedTest {
                 })).build("processor")
                 .get();
         processor.process("hello", 1);
+        processor.close();
         assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]:
hello -> 1\n"));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index e1a014d..3ba88e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -16,22 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 
@@ -39,9 +33,6 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamPrintTest {
 
-    private final Serde<Integer> intSerd = Serdes.Integer();
-    private final Serde<String> stringSerd = Serdes.String();
-    private PrintWriter printWriter;
     private ByteArrayOutputStream byteOutStream;
 
     private KeyValueMapper<Integer, String, String> mapper;
@@ -49,9 +40,8 @@ public class KStreamPrintTest {
     private Processor printProcessor;
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         byteOutStream = new ByteArrayOutputStream();
-        printWriter = new PrintWriter(new OutputStreamWriter(byteOutStream, StandardCharsets.UTF_8));
 
         mapper = new KeyValueMapper<Integer, String, String>() {
             @Override
@@ -60,7 +50,7 @@ public class KStreamPrintTest {
             }
         };
 
-        kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter,
mapper, "test-stream"));
+        kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(byteOutStream,
mapper, "test-stream"));
 
         printProcessor = kStreamPrint.get();
         ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
@@ -98,7 +88,7 @@ public class KStreamPrintTest {
         for (KeyValue<K, V> record: inputRecords) {
             printProcessor.process(record.key, record.value);
         }
-        printWriter.flush();
+        printProcessor.close();
         assertFlushData(expectedResult, byteOutStream);
     }
 }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message