kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4894; Fix findbugs "default character set in use" warnings
Date Mon, 20 Mar 2017 20:52:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 783900c25 -> 5a2fcdd6d


KAFKA-4894; Fix findbugs "default character set in use" warnings

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2683 from cmccabe/KAFKA-4894


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a2fcdd6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a2fcdd6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a2fcdd6

Branch: refs/heads/trunk
Commit: 5a2fcdd6d480e9f003cc49a59d5952ba4c515a71
Parents: 783900c
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Mon Mar 20 13:52:35 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Mar 20 13:52:35 2017 -0700

----------------------------------------------------------------------
 .../internals/ErrorLoggingCallback.java         |  6 ++-
 .../org/apache/kafka/common/utils/Shell.java    |  7 ++-
 .../kafka/connect/file/FileStreamSinkTask.java  |  7 ++-
 .../connect/file/FileStreamSourceTask.java      |  5 +-
 .../kafka/connect/runtime/AbstractHerder.java   |  3 +-
 .../kafka/connect/runtime/rest/RestServer.java  |  3 +-
 .../coordinator/GroupMetadataManager.scala      | 17 +++----
 .../scala/kafka/server/OffsetCheckpoint.scala   |  5 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     | 24 ++++++----
 .../scala/kafka/tools/ConsoleProducer.scala     | 11 +++--
 .../scala/kafka/tools/EndToEndLatency.scala     |  5 +-
 .../scala/kafka/tools/ExportZkOffsets.scala     |  6 ++-
 .../scala/kafka/tools/ImportZkOffsets.scala     |  6 +--
 .../scala/kafka/tools/ProducerPerformance.scala |  5 +-
 .../kafka/tools/StateChangeLogMerger.scala      |  3 +-
 .../admin/BrokerApiVersionsCommandTest.scala    |  2 +-
 .../kafka/api/BaseProducerSendTest.scala        | 48 ++++++++++++--------
 .../kafka/admin/DeleteConsumerGroupTest.scala   |  6 ++-
 .../kafka/log4jappender/KafkaLog4jAppender.java |  4 +-
 .../streams/kstream/internals/KStreamImpl.java  | 14 +++---
 .../streams/kstream/internals/KTableImpl.java   | 11 +++--
 .../kstream/internals/KeyValuePrinter.java      | 43 +++++++++---------
 .../state/internals/OffsetCheckpoint.java       | 12 ++---
 .../internals/KeyValuePrinterProcessorTest.java | 36 +++++++++------
 24 files changed, 168 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
index e980173..07a2878 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+
 public class ErrorLoggingCallback implements Callback {
     private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
     private String topic;
@@ -44,9 +46,9 @@ public class ErrorLoggingCallback implements Callback {
     public void onCompletion(RecordMetadata metadata, Exception e) {
         if (e != null) {
             String keyString = (key == null) ? "null" :
-                    logAsString ? new String(key) : key.length + " bytes";
+                    logAsString ? new String(key, StandardCharsets.UTF_8) : key.length + " bytes";
             String valueString = (valueLength == -1) ? "null" :
-                    logAsString ? new String(value) : valueLength + " bytes";
+                    logAsString ? new String(value, StandardCharsets.UTF_8) : valueLength + " bytes";
             log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
                     topic, keyString, valueString, e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
index 3c765b5..0a7e1af 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.utils;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,8 +88,10 @@ abstract public class Shell {
             //One time scheduling.
             timeoutTimer.schedule(new ShellTimeoutTimerTask(this), timeout);
         }
-        final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
-        BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+        final BufferedReader errReader = new BufferedReader(
+            new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8));
+        BufferedReader inReader = new BufferedReader(
+            new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
         final StringBuffer errMsg = new StringBuffer();
 
         // read error and input streams as this would free up the buffers

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 706347c..b663f81 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Map;
 
@@ -60,8 +62,9 @@ public class FileStreamSinkTask extends SinkTask {
             outputStream = System.out;
         } else {
             try {
-                outputStream = new PrintStream(new FileOutputStream(filename, true));
-            } catch (FileNotFoundException e) {
+                outputStream = new PrintStream(new FileOutputStream(filename, true), false,
+                    StandardCharsets.UTF_8.name());
+            } catch (FileNotFoundException | UnsupportedEncodingException e) {
                 throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index b8b6cfd..8edf385 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -64,7 +65,7 @@ public class FileStreamSourceTask extends SourceTask {
             stream = System.in;
             // Tracking offset for stdin doesn't make sense
             streamOffset = null;
-            reader = new BufferedReader(new InputStreamReader(stream));
+            reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
         }
         topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
         if (topic == null)
@@ -99,7 +100,7 @@ public class FileStreamSourceTask extends SourceTask {
                 } else {
                     streamOffset = 0L;
                 }
-                reader = new BufferedReader(new InputStreamReader(stream));
+                reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
                 log.debug("Opened {} for reading", logFilename());
             } catch (FileNotFoundException e) {
                 log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 744c364..6a16185 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -339,8 +340,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
     private String trace(Throwable t) {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        t.printStackTrace(new PrintStream(output));
         try {
+            t.printStackTrace(new PrintStream(output, false, StandardCharsets.UTF_8.name()));
             return output.toString("UTF-8");
         } catch (UnsupportedEncodingException e) {
             return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index ce62b7e..7afb4e8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -53,6 +53,7 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -213,7 +214,7 @@ public class RestServer {
                 connection.setDoOutput(true);
 
                 OutputStream os = connection.getOutputStream();
-                os.write(serializedBody.getBytes());
+                os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
                 os.flush();
                 os.close();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index d48328d..6c8f252 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -19,6 +19,7 @@ package kafka.coordinator
 
 import java.io.PrintStream
 import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
@@ -1045,10 +1046,10 @@ object GroupMetadataManager {
           val formattedValue =
             if (value == null) "NULL"
             else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
-          output.write(groupTopicPartition.toString.getBytes)
-          output.write("::".getBytes)
-          output.write(formattedValue.getBytes)
-          output.write("\n".getBytes)
+          output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
         case _ => // no-op
       }
     }
@@ -1066,10 +1067,10 @@ object GroupMetadataManager {
           val formattedValue =
             if (value == null) "NULL"
             else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
-          output.write(groupId.getBytes)
-          output.write("::".getBytes)
-          output.write(formattedValue.getBytes)
-          output.write("\n".getBytes)
+          output.write(groupId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
         case _ => // no-op
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 084ed60..de2626c 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -25,6 +25,7 @@ import scala.collection._
 import kafka.utils.{Exit, Logging}
 import kafka.common._
 import java.io._
+import java.nio.charset.StandardCharsets
 
 import org.apache.kafka.common.TopicPartition
 
@@ -47,7 +48,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
     lock synchronized {
       // write to temp file and then swap with the existing file
       val fileOutputStream = new FileOutputStream(tempPath.toFile)
-      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
+      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
       try {
         writer.write(CurrentVersion.toString)
         writer.newLine()
@@ -83,7 +84,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
       new IOException(s"Malformed line in offset checkpoint file: $line'")
 
     lock synchronized {
-      val reader = new BufferedReader(new FileReader(file))
+      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
       var line: String = null
       try {
         line = reader.readLine()

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 2edb88d..56f125a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -18,8 +18,10 @@
 package kafka.tools
 
 import java.io.PrintStream
+import java.nio.charset.StandardCharsets
 import java.util.concurrent.CountDownLatch
 import java.util.{Locale, Properties, Random}
+
 import joptsimple._
 import kafka.api.OffsetRequest
 import kafka.common.{MessageFormatter, StreamEndException}
@@ -33,6 +35,7 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.Deserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
+
 import scala.collection.JavaConverters._
 
 /**
@@ -412,8 +415,8 @@ object ConsoleConsumer extends Logging {
 class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printTimestamp = false
-  var keySeparator = "\t".getBytes
-  var lineSeparator = "\n".getBytes
+  var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
+  var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
 
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
@@ -424,9 +427,9 @@ class DefaultMessageFormatter extends MessageFormatter {
     if (props.containsKey("print.key"))
       printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
     if (props.containsKey("key.separator"))
-      keySeparator = props.getProperty("key.separator").getBytes
+      keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
     if (props.containsKey("line.separator"))
-      lineSeparator = props.getProperty("line.separator").getBytes
+      lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
     if (props.containsKey("key.deserializer"))
       keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
@@ -438,8 +441,9 @@ class DefaultMessageFormatter extends MessageFormatter {
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
 
     def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {
-      val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)
-      val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes)
+      val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
+      val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
+        getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
       output.write(convertedBytes)
       output.write(separator)
     }
@@ -448,9 +452,9 @@ class DefaultMessageFormatter extends MessageFormatter {
 
     if (printTimestamp) {
       if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
-        output.write(s"$timestampType:$timestamp".getBytes)
+        output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
       else
-        output.write(s"NO_TIMESTAMP".getBytes)
+        output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
       output.write(keySeparator)
     }
 
@@ -470,8 +474,8 @@ class LoggingMessageFormatter extends MessageFormatter   {
     defaultWriter.writeTo(consumerRecord, output)
     if (logger.isInfoEnabled)
       logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
-                  s"key:${if (key == null) "null" else new String(key)}, " +
-                  s"value:${if (value == null) "null" else new String(value)}")
+                  s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
+                  s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 6d99d99..ffb3458 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -24,6 +24,7 @@ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
 import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
+import java.nio.charset.StandardCharsets
 
 import joptsimple._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -301,7 +302,7 @@ object ConsoleProducer {
         keySeparator = props.getProperty("key.separator")
       if (props.containsKey("ignore.error"))
         ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
-      reader = new BufferedReader(new InputStreamReader(inputStream))
+      reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
     }
 
     override def readMessage() = {
@@ -312,14 +313,14 @@ object ConsoleProducer {
         case (line, true) =>
           line.indexOf(keySeparator) match {
             case -1 =>
-              if (ignoreError) new ProducerRecord(topic, line.getBytes)
+              if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
               else throw new KafkaException(s"No key found on line $lineNumber: $line")
             case n =>
-              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
-              new ProducerRecord(topic, line.substring(0, n).getBytes, value)
+              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
+              new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
           }
         case (line, false) =>
-          new ProducerRecord(topic, line.getBytes)
+          new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 06b72f8..3beaf82 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -17,6 +17,7 @@
 
 package kafka.tools
 
+import java.nio.charset.StandardCharsets
 import java.util.{Arrays, Collections, Properties}
 
 import kafka.utils.Exit
@@ -113,8 +114,8 @@ object EndToEndLatency {
       }
 
       //Check result matches the original record
-      val sent = new String(message)
-      val read = new String(recordIter.next().value())
+      val sent = new String(message, StandardCharsets.UTF_8)
+      val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
       if (!read.equals(sent)) {
         finalise()
         throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 9e605ab..eeae270 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -17,7 +17,8 @@
 
 package kafka.tools
 
-import java.io.FileWriter
+import java.io.{FileOutputStream, FileWriter, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
 
 import joptsimple._
 import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
@@ -76,7 +77,8 @@ object ExportZkOffsets extends Logging {
     val outfile    = options.valueOf(outFileOpt)
 
     var zkUtils   : ZkUtils    = null
-    val fileWriter : FileWriter  = new FileWriter(outfile)
+    val fileWriter : OutputStreamWriter =
+        new OutputStreamWriter(new FileOutputStream(outfile), StandardCharsets.UTF_8)
     
     try {
       zkUtils = ZkUtils(zkConnect,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index ded23a4..bb9a65d 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -17,8 +17,8 @@
 
 package kafka.tools
 
-import java.io.BufferedReader
-import java.io.FileReader
+import java.io.{BufferedReader, FileInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
 
 import joptsimple._
 import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
@@ -77,7 +77,7 @@ object ImportZkOffsets extends Logging {
   }
 
   private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
-    val fr = new FileReader(filename)
+    val fr = new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)
     val br = new BufferedReader(fr)
     var partOffsetsMap: Map[String,String] = Map()
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 3808627..f14253b 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong
 import java.util._
 import java.text.SimpleDateFormat
 import java.math.BigInteger
+import java.nio.charset.StandardCharsets
 
 import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
@@ -245,7 +246,7 @@ object ProducerPerformance extends Logging {
 
       val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
       debug(seqMsgString)
-      seqMsgString.getBytes()
+      seqMsgString.getBytes(StandardCharsets.UTF_8)
     }
 
     private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
@@ -276,7 +277,7 @@ object ProducerPerformance extends Logging {
                 Thread.sleep(config.messageSendGapMs)
             })
         } catch {
-          case e: Throwable => error("Error when sending message " + new String(message), e)
+          case e: Throwable => error("Error when sending message " + new String(message, StandardCharsets.UTF_8), e)
         }
         i += 1
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index c8d6710..0bd749a 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -27,6 +27,7 @@ import java.text.SimpleDateFormat
 import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
 import kafka.common.Topic
 import java.io.{BufferedOutputStream, OutputStream}
+import java.nio.charset.StandardCharsets
 
 /**
  * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).
@@ -147,7 +148,7 @@ object StateChangeLogMerger extends Logging {
 
     while (pqueue.nonEmpty) {
       val lineItr = pqueue.dequeue()
-      output.write((lineItr.line + "\n").getBytes)
+      output.write((lineItr.line + "\n").getBytes(StandardCharsets.UTF_8))
       val nextLineItr = getNextLine(lineItr.itr)
       if (!nextLineItr.isEmpty)
         pqueue.enqueue(nextLineItr)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index cb36964..469fbff 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -36,7 +36,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
   @Test(timeout=120000)
   def checkBrokerApiVersionCommandOutput() {
     val byteArrayOutputStream = new ByteArrayOutputStream
-    val printStream = new PrintStream(byteArrayOutputStream)
+    val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
     BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)
     val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
     val lineIter = content.split("\n").iterator

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index da3f651..42e3b11 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import java.nio.charset.StandardCharsets
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
@@ -107,9 +108,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
           assertEquals(topic, metadata.topic())
           assertEquals(partition, metadata.partition())
           offset match {
-            case 0 => assertEquals(metadata.serializedKeySize + metadata.serializedValueSize, "key".getBytes.length + "value".getBytes.length)
-            case 1 => assertEquals(metadata.serializedKeySize(), "key".getBytes.length)
-            case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes.length)
+            case 0 => assertEquals(metadata.serializedKeySize + metadata.serializedValueSize,
+              "key".getBytes(StandardCharsets.UTF_8).length + "value".getBytes(StandardCharsets.UTF_8).length)
+            case 1 => assertEquals(metadata.serializedKeySize(), "key".getBytes(StandardCharsets.UTF_8).length)
+            case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes(StandardCharsets.UTF_8).length)
             case _ => assertTrue(metadata.serializedValueSize > 0)
           }
           assertNotEquals(metadata.checksum(), 0)
@@ -125,24 +127,27 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // send a normal record
-      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
+        "value".getBytes(StandardCharsets.UTF_8))
       assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
 
       // send a record with null value should be ok
-      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, null)
+      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8), null)
       assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
 
       // send a record with null key should be ok
-      val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
+      val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
       assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
 
       // send a record with null part id should be ok
-      val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
+        "value".getBytes(StandardCharsets.UTF_8))
       assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
 
       // send a record with null topic should fail
       try {
-        val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
+        val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, partition, "key".getBytes(StandardCharsets.UTF_8),
+          "value".getBytes(StandardCharsets.UTF_8))
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
@@ -183,7 +188,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
-        val record = new ProducerRecord(topic, partition, s"key$i".getBytes, s"value$i".getBytes)
+        val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
+          s"value$i".getBytes(StandardCharsets.UTF_8))
         (record, producer.send(record))
       }
       producer.close(timeoutMs, TimeUnit.MILLISECONDS)
@@ -237,7 +243,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
-        val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes, s"value$i".getBytes)
+        val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
+          s"value$i".getBytes(StandardCharsets.UTF_8))
         (record, producer.send(record, callback))
       }
       producer.close(20000L, TimeUnit.MILLISECONDS)
@@ -268,7 +275,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // non-blocking send a list of records
-      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
+        "value".getBytes(StandardCharsets.UTF_8))
       for (_ <- 1 to numRecords)
         producer.send(record0)
       val response0 = producer.send(record0)
@@ -301,7 +309,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
       val now = System.currentTimeMillis()
       val futures = (1 to numRecords).map { i =>
-        producer.send(new ProducerRecord(topic, partition, now, null, ("value" + i).getBytes))
+        producer.send(new ProducerRecord(topic, partition, now, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
       }.map(_.get(30, TimeUnit.SECONDS))
 
       // make sure all of them end up in the same partition with increasing offset values
@@ -345,7 +353,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val partition0 = 0
 
     var futures0 = (1 to numRecords).map { i =>
-      producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes))
+      producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
     }.map(_.get(30, TimeUnit.SECONDS))
 
     // make sure all of them end up in the same partition with increasing offset values
@@ -358,7 +366,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     // Trying to send a record to a partition beyond topic's partition range before adding the partition should fail.
     val partition1 = 1
     try {
-      producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes))
+      producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8)))
       fail("Should not allow sending a record to a partition not present in the metadata")
     } catch {
       case _: KafkaException => // this is ok
@@ -371,7 +379,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     // send records to the newly added partition after confirming that metadata have been updated.
     val futures1 = (1 to numRecords).map { i =>
-      producer.send(new ProducerRecord(topic, partition1, null, ("value" + i).getBytes))
+      producer.send(new ProducerRecord(topic, partition1, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
     }.map(_.get(30, TimeUnit.SECONDS))
 
     // make sure all of them end up in the same partition with increasing offset values
@@ -382,7 +390,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
 
     futures0 = (1 to numRecords).map { i =>
-      producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes))
+      producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
     }.map(_.get(30, TimeUnit.SECONDS))
 
     // make sure all of them end up in the same partition with increasing offset values starting where previous
@@ -401,7 +409,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
     try {
       TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
+        "value".getBytes(StandardCharsets.UTF_8))
       for (_ <- 0 until 50) {
         val responses = (0 until numRecords) map (_ => producer.send(record))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
@@ -421,7 +430,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
-    val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
+    val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
+      "value".getBytes(StandardCharsets.UTF_8))
 
     // Test closing from caller thread.
     for (_ <- 0 until 50) {
@@ -450,7 +460,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
-    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
 
     // Test closing from sender thread.
     class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean) extends Callback {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index c19127e..9a20a1d 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -16,11 +16,13 @@
  */
 package kafka.admin
 
+import java.nio.charset.StandardCharsets
+
 import kafka.utils._
 import kafka.server.KafkaConfig
 import org.junit.Test
 import kafka.consumer._
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import kafka.integration.KafkaServerTestHarness
 
 
@@ -201,7 +203,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) {
-    messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes)))
+    messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes(StandardCharsets.UTF_8))))
   }
 
   private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index fa0adf8..513052e 100644
--- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -29,6 +29,7 @@ import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -249,7 +250,8 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
     protected void append(LoggingEvent event) {
         String message = subAppend(event);
         LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
-        Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[], byte[]>(topic, message.getBytes()));
+        Future<RecordMetadata> response = producer.send(
+            new ProducerRecord<byte[], byte[]>(topic, message.getBytes(StandardCharsets.UTF_8)));
         if (syncSend) {
             try {
                 response.get();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79abbb5..5eabc2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -40,9 +40,10 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
 
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Array;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Objects;
@@ -224,11 +225,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
         try {
-
-            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
-            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde, streamName), this.name);
-
-        } catch (FileNotFoundException e) {
+            PrintWriter printWriter = null;
+            printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
+            topology.addProcessor(name, new KeyValuePrinter<>(printWriter, keySerde, valSerde, streamName), this.name);
+        } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
             throw new TopologyBuilderException(message);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3f80143..6120f91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -32,8 +32,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.Set;
 
@@ -170,9 +171,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
         try {
-            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
-            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde, streamName), this.name);
-        } catch (FileNotFoundException e) {
+            PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
+            topology.addProcessor(name, new KeyValuePrinter<>(printWriter, keySerde, valSerde, streamName), this.name);
+        } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
             throw new TopologyBuilderException(message);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
index e193e52..c1d4382 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -23,51 +23,44 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-import java.io.PrintStream;
+import java.io.PrintWriter;
 
 
 class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
-
-    private final PrintStream printStream;
+    private final PrintWriter printWriter;
     private Serde<?> keySerde;
     private Serde<?> valueSerde;
     private String streamName;
 
-
-    KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde, String streamName) {
+    KeyValuePrinter(PrintWriter printWriter, Serde<?> keySerde, Serde<?> valueSerde, String streamName) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.streamName = streamName;
-        if (printStream == null) {
-            this.printStream = System.out;
-        } else {
-            this.printStream = printStream;
-        }
+        this.printWriter = printWriter;
     }
 
-    KeyValuePrinter(PrintStream printStream, String streamName) {
-        this(printStream, null, null, streamName);
+    KeyValuePrinter(PrintWriter printWriter, String streamName) {
+        this(printWriter, null, null, streamName);
     }
 
     KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName) {
-        this(System.out, keySerde, valueSerde, streamName);
+        this(null, keySerde, valueSerde, streamName);
     }
 
     @Override
     public Processor<K, V> get() {
-        return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde, this.streamName);
+        return new KeyValuePrinterProcessor(this.printWriter, this.keySerde, this.valueSerde, this.streamName);
     }
 
-
     private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
-        private final PrintStream printStream;
+        private final PrintWriter printWriter;
         private Serde<?> keySerde;
         private Serde<?> valueSerde;
         private ProcessorContext processorContext;
         private String streamName;
 
-        private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde, String streamName) {
-            this.printStream = printStream;
+        private KeyValuePrinterProcessor(PrintWriter printWriter, Serde<?> keySerde, Serde<?> valueSerde, String streamName) {
+            this.printWriter = printWriter;
             this.keySerde = keySerde;
             this.valueSerde = valueSerde;
             this.streamName = streamName;
@@ -91,11 +84,17 @@ class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
             K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
             V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
 
-            printStream.println("[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint);
+            println("[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint);
 
             this.processorContext.forward(key, value);
         }
 
+        private void println(String str) {
+            if (printWriter == null)
+                System.out.println(str);
+            else
+                printWriter.println(str);
+        }
 
         private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer) {
             if (receivedElement == null) {
@@ -111,10 +110,10 @@ class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
 
         @Override
         public void close() {
-            if (this.printStream == System.out) {
-                this.printStream.flush();
+            if (printWriter == null) {
+                System.out.flush();
             } else {
-                this.printStream.close();
+                printWriter.close();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index b676421..54d2165 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -23,11 +23,13 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.EOFException;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
@@ -69,8 +71,8 @@ public class OffsetCheckpoint {
             File temp = new File(file.getAbsolutePath() + ".tmp");
 
             FileOutputStream fileOutputStream = new FileOutputStream(temp);
-            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
-            try {
+            try (BufferedWriter writer = new BufferedWriter(
+                    new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
                 writeIntLine(writer, VERSION);
                 writeIntLine(writer, offsets.size());
 
@@ -79,8 +81,6 @@ public class OffsetCheckpoint {
 
                 writer.flush();
                 fileOutputStream.getFD().sync();
-            } finally {
-                writer.close();
             }
 
             Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
@@ -116,7 +116,7 @@ public class OffsetCheckpoint {
         synchronized (lock) {
             BufferedReader reader;
             try {
-                reader = new BufferedReader(new FileReader(file));
+                reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
             } catch (FileNotFoundException e) {
                 return Collections.emptyMap();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2fcdd6/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
index 6c0162c..7de2b8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -26,11 +26,14 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -39,11 +42,18 @@ public class KeyValuePrinterProcessorTest {
 
     private final String topicName = "topic";
     private final Serde<String> stringSerde = Serdes.String();
-    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    private final KStreamBuilder builder = new KStreamBuilder();
-    private final PrintStream printStream = new PrintStream(baos);
-
-    private KStreamTestDriver driver = null;
+    private ByteArrayOutputStream baos;
+    private KStreamBuilder builder;
+    private PrintWriter printWriter;
+    private KStreamTestDriver driver;
+
+    @Before
+    public void setup() {
+        baos = new ByteArrayOutputStream();
+        builder = new KStreamBuilder();
+        printWriter = new PrintWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8));
+        driver = null;
+    }
 
     @After
     public void cleanup() {
@@ -56,7 +66,7 @@ public class KeyValuePrinterProcessorTest {
     @Test
     public void testPrintKeyValueDefaultSerde() throws Exception {
 
-        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream, null);
+        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printWriter, null);
         String[] suppliedKeys = {"foo", "bar", null};
         String[] suppliedValues = {"value1", "value2", "value3"};
         String[] expectedValues = {"[null]: foo , value1", "[null]: bar , value2", "[null]: null , value3"};
@@ -69,7 +79,7 @@ public class KeyValuePrinterProcessorTest {
         for (int i = 0; i < suppliedKeys.length; i++) {
             driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
         }
-
+        printWriter.flush();
         String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n");
 
         for (int i = 0; i < capturedValues.length; i++) {
@@ -80,7 +90,7 @@ public class KeyValuePrinterProcessorTest {
     @Test
     public void testPrintKeyValuesWithName() throws Exception {
 
-        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream, "test-stream");
+        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printWriter, "test-stream");
         String[] suppliedKeys = {"foo", "bar", null};
         String[] suppliedValues = {"value1", "value2", "value3"};
         String[] expectedValues = {"[test-stream]: foo , value1", "[test-stream]: bar , value2", "[test-stream]: null , value3"};
@@ -93,7 +103,7 @@ public class KeyValuePrinterProcessorTest {
         for (int i = 0; i < suppliedKeys.length; i++) {
             driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
         }
-
+        printWriter.flush();
         String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n");
 
         for (int i = 0; i < capturedValues.length; i++) {
@@ -106,7 +116,7 @@ public class KeyValuePrinterProcessorTest {
     public void testPrintKeyValueWithProvidedSerde() throws Exception {
 
         Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new MockSerializer(), new MockDeserializer());
-        KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printStream, stringSerde, mockObjectSerde, null);
+        KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printWriter, stringSerde, mockObjectSerde, null);
         KStream<String, MockObject> stream = builder.stream(stringSerde, mockObjectSerde, topicName);
 
         stream.process(keyValuePrinter);
@@ -117,10 +127,10 @@ public class KeyValuePrinterProcessorTest {
         byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
 
         driver.process(topicName, suppliedKey, suppliedValue);
-        String expectedPrintedValue = "[null]: null , name:print label:test";
+        printWriter.flush();
         String capturedValue = new String(baos.toByteArray(), Charset.forName("UTF-8")).trim();
 
-        assertEquals(capturedValue, expectedPrintedValue);
+        assertEquals("[null]: null , name:print label:test", capturedValue);
 
     }
 


Mime
View raw message