kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Avoid FileInputStream/FileOutputStream (#5281)
Date Wed, 27 Jun 2018 08:00:22 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a74ec6  MINOR: Avoid FileInputStream/FileOutputStream (#5281)
7a74ec6 is described below

commit 7a74ec62d291bf344b308115bd115a6788074a93
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Wed Jun 27 01:00:05 2018 -0700

    MINOR: Avoid FileInputStream/FileOutputStream (#5281)
    
    They rely on finalizers (before Java 11), which create
    unnecessary GC load. The alternatives are as easy to
    use and don't have this issue.
    
    Also use FileChannel directly instead of retrieving
    it from RandomAccessFile whenever possible
    since the indirection is unnecessary.
    
    Finally, add a few try/finally blocks.
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../common/config/provider/FileConfigProvider.java   |  8 +++-----
 .../org/apache/kafka/common/record/FileRecords.java  | 19 ++++++++-----------
 .../apache/kafka/common/security/ssl/SslFactory.java |  6 ++++--
 .../java/org/apache/kafka/common/utils/Utils.java    |  7 +++----
 .../common/record/LazyDownConversionRecordsTest.java |  4 ++--
 .../java/org/apache/kafka/test/TestSslUtils.java     | 10 ++++++----
 .../kafka/connect/file/FileStreamSinkTask.java       | 11 ++++++-----
 .../kafka/connect/file/FileStreamSourceTask.java     | 14 +++++++++-----
 .../kafka/connect/file/FileStreamSourceTaskTest.java |  7 ++++---
 .../connect/storage/FileOffsetBackingStore.java      | 13 ++++++-------
 core/src/main/scala/kafka/log/AbstractIndex.scala    |  4 ++--
 .../main/scala/kafka/log/ProducerStateManager.scala  | 14 ++++++--------
 core/src/main/scala/kafka/log/TransactionIndex.scala |  4 ++--
 .../kafka/server/BrokerMetadataCheckpoint.scala      |  5 +++--
 .../kafka/server/checkpoints/CheckpointFile.scala    |  2 +-
 core/src/main/scala/kafka/utils/FileLock.scala       |  8 +++-----
 .../api/SaslClientsWithInvalidCredentialsTest.scala  |  4 ++--
 .../server/DynamicBrokerReconfigurationTest.scala    | 12 ++++++------
 .../scala/other/kafka/ReplicationQuotasTestRig.scala |  7 ++++---
 .../scala/other/kafka/TestLinearWriteSpeed.scala     |  9 ++++++---
 core/src/test/scala/other/kafka/TestTruncate.scala   |  4 +++-
 core/src/test/scala/unit/kafka/KafkaConfigTest.scala | 20 ++++++++++----------
 .../FetchRequestDownConversionConfigTest.scala       |  3 +--
 .../scala/unit/kafka/tools/ConsoleConsumerTest.scala | 13 +++++++------
 core/src/test/scala/unit/kafka/utils/TestUtils.scala | 17 ++++++++++-------
 .../org/apache/kafka/streams/kstream/Printed.java    |  9 +++++----
 .../streams/state/internals/OffsetCheckpoint.java    | 10 +++-------
 .../apache/kafka/streams/kstream/PrintedTest.java    |  7 ++++---
 .../internals/GlobalStateManagerImplTest.java        |  5 +++--
 .../apache/kafka/tools/VerifiableLog4jAppender.java  |  8 ++++----
 .../org/apache/kafka/tools/VerifiableProducer.java   |  8 ++++----
 .../kafka/trogdor/basic/BasicPlatformTest.java       |  3 +--
 32 files changed, 141 insertions(+), 134 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
index d25183a..4e376ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
@@ -19,12 +19,10 @@ package org.apache.kafka.common.config.provider;
 import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.ConfigException;
 
-import java.io.BufferedReader;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.Reader;
-import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
@@ -97,7 +95,7 @@ public class FileConfigProvider implements ConfigProvider {
 
     // visible for testing
     protected Reader reader(String path) throws IOException {
-        return new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
+        return Files.newBufferedReader(Paths.get(path));
     }
 
     public void close() {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index df38ac7..0aa9f46 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -25,13 +25,13 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -426,19 +426,16 @@ public class FileRecords extends AbstractRecords implements Closeable {
                                            int initFileSize,
                                            boolean preallocate) throws IOException {
         if (mutable) {
-            if (fileAlreadyExists) {
-                return new RandomAccessFile(file, "rw").getChannel();
+            if (fileAlreadyExists || !preallocate) {
+                return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
+                        StandardOpenOption.WRITE);
             } else {
-                if (preallocate) {
-                    RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
-                    randomAccessFile.setLength(initFileSize);
-                    return randomAccessFile.getChannel();
-                } else {
-                    return new RandomAccessFile(file, "rw").getChannel();
-                }
+                RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+                randomAccessFile.setLength(initFileSize);
+                return randomAccessFile.getChannel();
             }
         } else {
-            return new FileInputStream(file).getChannel();
+            return FileChannel.open(file.toPath());
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 055404c..61f2f0d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -33,9 +33,11 @@ import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.TrustManagerFactory;
-import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.Principal;
@@ -320,7 +322,7 @@ public class SslFactory implements Reconfigurable {
          *   using the specified configs (e.g. if the password or keystore type is invalid)
          */
         KeyStore load() {
-            try (FileInputStream in = new FileInputStream(path)) {
+            try (InputStream in = Files.newInputStream(Paths.get(path))) {
                 KeyStore ks = KeyStore.getInstance(type);
                 // If a password is not set access to the truststore is still available, but integrity checking is disabled.
                 char[] passwordChars = password != null ? password.value().toCharArray() : null;
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 31fa01c..b541833 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -24,7 +24,6 @@ import java.io.Closeable;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
@@ -41,6 +40,7 @@ import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.text.DecimalFormat;
@@ -520,7 +520,7 @@ public final class Utils {
         Properties props = new Properties();
 
         if (filename != null) {
-            try (InputStream propStream = new FileInputStream(filename)) {
+            try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
                 props.load(propStream);
             }
         } else {
@@ -590,8 +590,7 @@ public final class Utils {
     public static String readFileAsString(String path, Charset charset) throws IOException {
         if (charset == null) charset = Charset.defaultCharset();
 
-        try (FileInputStream stream = new FileInputStream(new File(path))) {
-            FileChannel fc = stream.getChannel();
+        try (FileChannel fc = FileChannel.open(Paths.get(path))) {
             MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
             return charset.decode(bb).toString();
         }
diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
index 89c1aea..b233793 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
@@ -26,9 +26,9 @@ import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -160,7 +160,7 @@ public class LazyDownConversionRecordsTest {
                     inputRecords, toMagic, 0L, Time.SYSTEM);
             LazyDownConversionRecordsSend lazySend = lazyRecords.toSend("foo");
             File outputFile = tempFile();
-            FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel();
+            FileChannel channel = FileChannel.open(outputFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
 
             int written = 0;
             while (written < bytesToConvert)
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 653ac82..90b6d8d 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -20,15 +20,17 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.network.Mode;
 
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.EOFException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.math.BigInteger;
 import java.net.InetAddress;
 
 import javax.net.ssl.TrustManagerFactory;
 
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.Key;
 import java.security.KeyPair;
@@ -99,7 +101,7 @@ public class TestSslUtils {
 
     private static void saveKeyStore(KeyStore ks, String filename,
                                      Password password) throws GeneralSecurityException, IOException {
-        try (FileOutputStream out = new FileOutputStream(filename)) {
+        try (OutputStream out = Files.newOutputStream(Paths.get(filename))) {
             ks.store(out, password.value().toCharArray());
         }
     }
@@ -137,7 +139,7 @@ public class TestSslUtils {
     public static <T extends Certificate> void createTrustStore(
             String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException {
         KeyStore ks = KeyStore.getInstance("JKS");
-        try (FileInputStream in = new FileInputStream(filename)) {
+        try (InputStream in = Files.newInputStream(Paths.get(filename))) {
             ks.load(in, password.value().toCharArray());
         } catch (EOFException e) {
             ks = createEmptyKeyStore();
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index b663f81..328dee6 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -24,11 +24,12 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
 import java.util.Collection;
 import java.util.Map;
 
@@ -62,9 +63,9 @@ public class FileStreamSinkTask extends SinkTask {
             outputStream = System.out;
         } else {
             try {
-                outputStream = new PrintStream(new FileOutputStream(filename, true), false,
+                outputStream = new PrintStream(Files.newOutputStream(Paths.get(filename), StandardOpenOption.APPEND), false,
                     StandardCharsets.UTF_8.name());
-            } catch (FileNotFoundException | UnsupportedEncodingException e) {
+            } catch (IOException e) {
                 throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
             }
         }
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 17037f2..582889b 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -17,12 +17,13 @@
 package org.apache.kafka.connect.file;
 
 import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -78,7 +79,7 @@ public class FileStreamSourceTask extends SourceTask {
     public List<SourceRecord> poll() throws InterruptedException {
         if (stream == null) {
             try {
-                stream = new FileInputStream(filename);
+                stream = Files.newInputStream(Paths.get(filename));
                 Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
                 if (offset != null) {
                     Object lastRecordedOffset = offset.get(POSITION_FIELD);
@@ -92,7 +93,7 @@ public class FileStreamSourceTask extends SourceTask {
                                 long skipped = stream.skip(skipLeft);
                                 skipLeft -= skipped;
                             } catch (IOException e) {
-                                log.error("Error while trying to seek to previous offset in file: ", e);
+                                log.error("Error while trying to seek to previous offset in file {}: ", filename, e);
                                 throw new ConnectException(e);
                             }
                         }
@@ -104,12 +105,15 @@ public class FileStreamSourceTask extends SourceTask {
                 }
                 reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
                 log.debug("Opened {} for reading", logFilename());
-            } catch (FileNotFoundException e) {
+            } catch (NoSuchFileException e) {
                 log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
                 synchronized (this) {
                     this.wait(1000);
                 }
                 return null;
+            } catch (IOException e) {
+                log.error("Error while trying to open file {}: ", filename, e);
+                throw new ConnectException(e);
             }
         }
 
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index c1c0a74..feacf8f 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -27,8 +27,9 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -81,7 +82,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
 
         task.start(config);
 
-        FileOutputStream os = new FileOutputStream(tempFile);
+        OutputStream os = Files.newOutputStream(tempFile.toPath());
         assertEquals(null, task.poll());
         os.write("partial line".getBytes());
         os.flush();
@@ -135,7 +136,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
         config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "5000");
         task.start(config);
 
-        FileOutputStream os = new FileOutputStream(tempFile);
+        OutputStream os = Files.newOutputStream(tempFile.toPath());
         for (int i = 0; i < 10_000; i++) {
             os.write("Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...\n".getBytes());
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 35e4855..8f828fb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -25,12 +25,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -69,7 +68,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
 
     @SuppressWarnings("unchecked")
     private void load() {
-        try (SafeObjectInputStream is = new SafeObjectInputStream(new FileInputStream(file))) {
+        try (SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(file.toPath()))) {
             Object obj = is.readObject();
             if (!(obj instanceof HashMap))
                 throw new ConnectException("Expected HashMap but found " + obj.getClass());
@@ -80,8 +79,8 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
                 ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
                 data.put(key, value);
             }
-        } catch (FileNotFoundException | EOFException e) {
-            // FileNotFoundException: Ignore, may be new.
+        } catch (NoSuchFileException | EOFException e) {
+            // NoSuchFileException: Ignore, may be new.
             // EOFException: Ignore, this means the file was missing or corrupt
         } catch (IOException | ClassNotFoundException e) {
             throw new ConnectException(e);
@@ -90,7 +89,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
 
     @Override
     protected void save() {
-        try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
+        try (ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(file.toPath()))) {
             Map<byte[], byte[]> raw = new HashMap<>();
             for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
                 byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 95f0749..d716a7c 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.{File, RandomAccessFile}
+import java.io.{Closeable, File, RandomAccessFile}
 import java.nio.channels.FileChannel
 import java.nio.file.Files
 import java.nio.{ByteBuffer, MappedByteBuffer}
@@ -39,7 +39,7 @@ import scala.math.ceil
  * @param maxIndexSize The maximum index size in bytes.
  */
 abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
-                                   val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
+                                   val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable with Logging {
 
   // Length of the index file
   @volatile
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index caca9a8..49c887b 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -16,9 +16,10 @@
  */
 package kafka.log
 
-import java.io._
+import java.io.File
 import java.nio.ByteBuffer
-import java.nio.file.Files
+import java.nio.channels.FileChannel
+import java.nio.file.{Files, StandardOpenOption}
 
 import kafka.log.Log.offsetFromFile
 import kafka.server.LogOffsetMetadata
@@ -430,12 +431,9 @@ object ProducerStateManager {
     val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit() - ProducerEntriesOffset)
     ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
 
-    val fos = new FileOutputStream(file)
-    try {
-      fos.write(buffer.array, buffer.arrayOffset, buffer.limit())
-    } finally {
-      fos.close()
-    }
+    val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
+    try fileChannel.write(buffer)
+    finally fileChannel.close()
   }
 
   private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix)
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index 349f0ce..da7fce8 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -81,8 +81,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
   }
 
   private def openChannel(): FileChannel = {
-    val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE,
-      StandardOpenOption.CREATE)
+    val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
+      StandardOpenOption.WRITE)
     maybeChannel = Some(channel)
     channel.position(channel.size)
     channel
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 8ac9864..2b915a0 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -18,8 +18,9 @@
 package kafka.server
 
 import java.io._
-import java.nio.file.Files
+import java.nio.file.{Files, NoSuchFileException}
 import java.util.Properties
+
 import kafka.utils._
 import org.apache.kafka.common.utils.Utils
 
@@ -70,7 +71,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
             throw new IOException("Unrecognized version of the server meta.properties file: " + version)
         }
       } catch {
-        case _: FileNotFoundException =>
+        case _: NoSuchFileException =>
           warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
           None
         case e1: Exception =>
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index 4c1011f..1878ae2 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -84,7 +84,7 @@ class CheckpointFile[T](val file: File,
       new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'")
     lock synchronized {
       try {
-        val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
+        val reader = Files.newBufferedReader(path)
         var line: String = null
         try {
           line = reader.readLine()
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index d1edc83..c0afbfb 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -18,7 +18,7 @@
 
 import java.io._
 import java.nio.channels._
-import java.nio.file.{FileAlreadyExistsException, Files}
+import java.nio.file.StandardOpenOption
 
 /**
  * A file lock a la flock/funlock
@@ -27,10 +27,8 @@ import java.nio.file.{FileAlreadyExistsException, Files}
  */
 class FileLock(val file: File) extends Logging {
 
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
-
-  private val channel = new RandomAccessFile(file, "rw").getChannel()
+  private val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
+    StandardOpenOption.WRITE)
   private var flock: java.nio.channels.FileLock = null
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index b79b679..3c2b353 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -12,7 +12,7 @@
   */
 package kafka.api
 
-import java.io.FileOutputStream
+import java.nio.file.Files
 import java.util.Collections
 import java.util.concurrent.{ExecutionException, TimeUnit}
 
@@ -159,7 +159,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
   @Test
   def testConsumerGroupServiceWithAuthenticationFailure() {
     val propsFile = TestUtils.tempFile()
-    val propsStream = new FileOutputStream(propsFile)
+    val propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("security.protocol=SASL_PLAINTEXT\n".getBytes())
     propsStream.write(s"sasl.mechanism=$kafkaClientSaslMechanism".getBytes())
     propsStream.close()
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 38b9f45..49553e8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,15 +18,15 @@
 
 package kafka.server
 
-import java.io.{Closeable, File, FileInputStream, FileWriter}
-import java.nio.file.{Files, StandardCopyOption}
+import java.io.{Closeable, File, FileWriter}
+import java.nio.file.{Files, Paths, StandardCopyOption}
 import java.lang.management.ManagementFactory
 import java.security.KeyStore
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent._
-import javax.management.ObjectName
 
+import javax.management.ObjectName
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
@@ -57,7 +57,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
 import org.apache.kafka.test.TestSslUtils
 import org.junit.Assert._
-import org.junit.{After, Before, Test, Ignore}
+import org.junit.{After, Before, Ignore, Test}
 
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
@@ -1056,13 +1056,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     def load(props: Properties): KeyStore = {
       val ks = KeyStore.getInstance("JKS")
       val password = props.get(SSL_TRUSTSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value
-      val in = new FileInputStream(props.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG))
+      val in = Files.newInputStream(Paths.get(props.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
       try {
         ks.load(in, password.toCharArray)
+        ks
       } finally {
         in.close()
       }
-      ks
     }
     val cert1 = load(trustStore1Props).getCertificate("kafka")
     val cert2 = load(trustStore2Props).getCertificate("kafka")
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index ee069e2..98e568b 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -17,9 +17,10 @@
 
 package other.kafka
 
-import java.io.{File, FileOutputStream, PrintWriter}
-import javax.imageio.ImageIO
+import java.io.{File, PrintWriter}
+import java.nio.file.{Files, StandardOpenOption}
 
+import javax.imageio.ImageIO
 import kafka.admin.ReassignPartitionsCommand
 import kafka.admin.ReassignPartitionsCommand.Throttle
 import org.apache.kafka.common.TopicPartition
@@ -310,7 +311,7 @@ object ReplicationQuotasTestRig {
     }
 
     def append(message: String): Unit = {
-      val stream = new FileOutputStream(log, true)
+      val stream = Files.newOutputStream(log.toPath, StandardOpenOption.APPEND)
       new PrintWriter(stream) {
         append(message)
         close
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index b385a2a..3c8dfa2 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -20,6 +20,7 @@ package kafka
 import java.io._
 import java.nio._
 import java.nio.channels._
+import java.nio.file.StandardOpenOption
 import java.util.{Properties, Random}
 
 import joptsimple._
@@ -188,20 +189,22 @@ object TestLinearWriteSpeed {
     }
     def close() {
       raf.close()
+      Utils.delete(file)
     }
   }
 
   class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable {
     file.deleteOnExit()
-    val raf = new RandomAccessFile(file, "rw")
-    val channel = raf.getChannel
+    val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
+      StandardOpenOption.WRITE)
     def write(): Int = {
       channel.write(content)
       content.rewind()
       content.limit()
     }
     def close() {
-      raf.close()
+      channel.close()
+      Utils.delete(file)
     }
   }
 
diff --git a/core/src/test/scala/other/kafka/TestTruncate.scala b/core/src/test/scala/other/kafka/TestTruncate.scala
index e90d57d..ba9c799 100644
--- a/core/src/test/scala/other/kafka/TestTruncate.scala
+++ b/core/src/test/scala/other/kafka/TestTruncate.scala
@@ -19,6 +19,8 @@ package kafka
 
 import java.io._
 import java.nio._
+import java.nio.channels.FileChannel
+import java.nio.file.StandardOpenOption
 
 /* This code tests the correct function of java's FileChannel.truncate--some platforms don't work. */
 object TestTruncate {
@@ -26,7 +28,7 @@ object TestTruncate {
   def main(args: Array[String]): Unit = {
     val name = File.createTempFile("kafka", ".test")
     name.deleteOnExit()
-    val file = new RandomAccessFile(name, "rw").getChannel()
+    val file = FileChannel.open(name.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE)
     val buffer = ByteBuffer.allocate(12)
     buffer.putInt(4).putInt(4).putInt(4)
     buffer.rewind()
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 18be167..469cf92 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -16,7 +16,8 @@
  */
 package kafka
 
-import java.io.{File, FileOutputStream}
+import java.io.File
+import java.nio.file.Files
 import java.util
 
 import kafka.server.KafkaConfig
@@ -104,14 +105,13 @@ class KafkaTest {
     val file = File.createTempFile("kafkatest", ".properties")
     file.deleteOnExit()
 
-    val writer = new FileOutputStream(file)
-    lines.foreach { l =>
-      writer.write(l.getBytes)
-      writer.write("\n".getBytes)
-    }
-
-    writer.close
-
-    file.getAbsolutePath
+    val writer = Files.newOutputStream(file.toPath)
+    try {
+      lines.foreach { l =>
+        writer.write(l.getBytes)
+        writer.write("\n".getBytes)
+      }
+      file.getAbsolutePath
+    } finally writer.close()
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 0a44881..1e762b3 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package unit.kafka.server
+package kafka.server
 
 import java.util
 import java.util.Properties
 
 import kafka.log.LogConfig
-import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 44ed552..4f09dd2 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,7 +17,8 @@
 
 package kafka.tools
 
-import java.io.{FileOutputStream, PrintStream}
+import java.io.PrintStream
+import java.nio.file.Files
 
 import kafka.common.MessageFormatter
 import kafka.tools.ConsoleConsumer.ConsumerWrapper
@@ -302,7 +303,7 @@ class ConsoleConsumerTest {
   @Test
   def shouldParseConfigsFromFile() {
     val propsFile = TestUtils.tempFile()
-    val propsStream = new FileOutputStream(propsFile)
+    val propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("request.timeout.ms=1000\n".getBytes())
     propsStream.write("group.id=group1".getBytes())
     propsStream.close()
@@ -324,7 +325,7 @@ class ConsoleConsumerTest {
 
     // different in all three places
     var propsFile = TestUtils.tempFile()
-    var propsStream = new FileOutputStream(propsFile)
+    var propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("group.id=group-from-file".getBytes())
     propsStream.close()
     var args: Array[String] = Array(
@@ -344,7 +345,7 @@ class ConsoleConsumerTest {
 
     // the same in all three places
     propsFile = TestUtils.tempFile()
-    propsStream = new FileOutputStream(propsFile)
+    propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("group.id=test-group".getBytes())
     propsStream.close()
     args = Array(
@@ -361,7 +362,7 @@ class ConsoleConsumerTest {
 
     // different via --consumer-property and --consumer.config
     propsFile = TestUtils.tempFile()
-    propsStream = new FileOutputStream(propsFile)
+    propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("group.id=group-from-file".getBytes())
     propsStream.close()
     args = Array(
@@ -395,7 +396,7 @@ class ConsoleConsumerTest {
 
     // different via --group and --consumer.config
     propsFile = TestUtils.tempFile()
-    propsStream = new FileOutputStream(propsFile)
+    propsStream = Files.newOutputStream(propsFile.toPath)
     propsStream.write("group.id=group-from-file".getBytes())
     propsStream.close()
     args = Array(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d2aae2c..db45196 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,12 +21,13 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.nio.charset.{Charset, StandardCharsets}
+import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, Executors, TimeUnit}
-import javax.net.ssl.X509TrustManager
 
+import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log._
@@ -119,7 +120,8 @@ object TestUtils extends Logging {
   /**
    * Create a temporary file and return an open file channel for this file
    */
-  def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
+  def tempChannel(): FileChannel =
+    FileChannel.open(tempFile().toPath, StandardOpenOption.READ, StandardOpenOption.WRITE)
 
   /**
    * Create a kafka server instance with appropriate test settings
@@ -879,11 +881,12 @@ object TestUtils extends Logging {
     file.close()
   }
 
-  def appendNonsenseToFile(fileName: File, size: Int) {
-    val file = new FileOutputStream(fileName, true)
-    for (_ <- 0 until size)
-      file.write(random.nextInt(255))
-    file.close()
+  def appendNonsenseToFile(file: File, size: Int) {
+    val outputStream = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND)
+    try {
+      for (_ <- 0 until size)
+        outputStream.write(random.nextInt(255))
+    } finally outputStream.close()
   }
 
   def checkForPhantomInSyncReplicas(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index 5a1d07f..5002efc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -18,9 +18,10 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.errors.TopologyException;
 
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Objects;
 
 /**
@@ -68,8 +69,8 @@ public class Printed<K, V> {
             throw new TopologyException("filePath can't be an empty string");
         }
         try {
-            return new Printed<>(new FileOutputStream(filePath));
-        } catch (final FileNotFoundException e) {
+            return new Printed<>(Files.newOutputStream(Paths.get(filePath)));
+        } catch (final IOException e) {
             throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 9f0e1f8..c8367b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -23,14 +23,12 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -123,9 +121,7 @@ public class OffsetCheckpoint {
      */
     public Map<TopicPartition, Long> read() throws IOException {
         synchronized (lock) {
-            try (BufferedReader reader
-                     = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) {
-
+            try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {
                 final int version = readInt(reader);
                 switch (version) {
                     case 0:
@@ -154,7 +150,7 @@ public class OffsetCheckpoint {
                     default:
                         throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
                 }
-            } catch (final FileNotFoundException e) {
+            } catch (final NoSuchFileException e) {
                 return Collections.emptyMap();
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
index a50fce9..2526138 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -28,11 +28,12 @@ import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -63,7 +64,7 @@ public class PrintedTest {
         final Processor<String, Integer> processor = processorSupplier.get();
         processor.process("hi", 1);
         processor.close();
-        try (final FileInputStream stream = new FileInputStream(file)) {
+        try (final InputStream stream = Files.newInputStream(file.toPath())) {
             final byte[] data = new byte[stream.available()];
             stream.read(data);
             assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]: hi, 1\n"));
@@ -130,4 +131,4 @@ public class PrintedTest {
     public void shouldThrowTopologyExceptionIfFilePathDoesntExist() {
         Printed.toFile("/this/should/not/exist");
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index e37f6a6..3374851 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -43,8 +43,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -658,7 +659,7 @@ public class GlobalStateManagerImplTest {
 
     private void writeCorruptCheckpoint() throws IOException {
         final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
+        try (final OutputStream stream = Files.newOutputStream(checkpointFile.toPath())) {
             stream.write("0\n1\nfoo".getBytes());
         }
     }
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index 9533f6e..9d23bf3 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -25,10 +25,10 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Properties;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -161,9 +161,9 @@ public class VerifiableLog4jAppender {
      * we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
      * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
      */
-    public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
+    public static Properties loadProps(String filename) throws IOException {
         Properties props = new Properties();
-        try (InputStream propStream = new FileInputStream(filename)) {
+        try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
             props.load(propStream);
         }
         return props;
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 744142b..6dcde61 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -35,10 +35,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
 
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Properties;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -198,9 +198,9 @@ public class VerifiableProducer {
      * we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
      * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
      */
-    public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
+    public static Properties loadProps(String filename) throws IOException {
         Properties props = new Properties();
-        try (InputStream propStream = new FileInputStream(filename)) {
+        try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
             props.load(propStream);
         }
         return props;
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java b/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java
index c0dd680..a2d8ef1 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java
@@ -26,7 +26,6 @@ import org.junit.rules.Timeout;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -41,7 +40,7 @@ public class BasicPlatformTest {
     public void testCreateBasicPlatform() throws Exception {
         File configFile = TestUtils.tempFile();
         try {
-            try (OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(configFile),
+            try (OutputStreamWriter writer = new OutputStreamWriter(Files.newOutputStream(configFile.toPath()),
                     StandardCharsets.UTF_8)) {
                 writer.write("{\n");
                 writer.write("  \"platform\": \"org.apache.kafka.trogdor.basic.BasicPlatform\",\n");


Mime
View raw message