kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: FileStreamSinkTask should create file if it doesn't exist (#5406)
Date Sat, 21 Jul 2018 04:01:15 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d2bf63  MINOR: FileStreamSinkTask should create file if it doesn't exist (#5406)
5d2bf63 is described below

commit 5d2bf6328e1c5b054017419cf4de562dc8a3ec7a
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Fri Jul 20 21:01:10 2018 -0700

    MINOR: FileStreamSinkTask should create file if it doesn't exist (#5406)
    
    A recent change from `new FileOutputStream` to `Files.newOutputStream` missed the `CREATE`
flag (which is necessary in addition to `APPEND`).
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/connect/file/FileStreamSinkTask.java     |  6 ++-
 .../kafka/connect/file/FileStreamSinkTaskTest.java | 52 +++++++++++++++++++++-
 2 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 328dee6..3d1d2b8 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -63,10 +63,12 @@ public class FileStreamSinkTask extends SinkTask {
             outputStream = System.out;
         } else {
             try {
-                outputStream = new PrintStream(Files.newOutputStream(Paths.get(filename),
StandardOpenOption.APPEND), false,
+                outputStream = new PrintStream(
+                    Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE,
StandardOpenOption.APPEND),
+                    false,
                     StandardCharsets.UTF_8.name());
             } catch (IOException e) {
-                throw new ConnectException("Couldn't find or create file for FileStreamSinkTask",
e);
+                throw new ConnectException("Couldn't find or create file '" + filename +
"' for FileStreamSinkTask", e);
             }
         }
     }
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
index c7ec9da..a5142a1 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
@@ -21,12 +21,21 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 
@@ -36,11 +45,17 @@ public class FileStreamSinkTaskTest {
     private ByteArrayOutputStream os;
     private PrintStream printStream;
 
+    @Rule
+    public TemporaryFolder topDir = new TemporaryFolder();
+    private String outputFile;
+
     @Before
-    public void setup() {
+    public void setup() throws Exception {
         os = new ByteArrayOutputStream();
         printStream = new PrintStream(os);
         task = new FileStreamSinkTask(printStream);
+        File outputDir = topDir.newFolder("file-stream-sink-" + UUID.randomUUID().toString());
+        outputFile = outputDir.getCanonicalPath() + "/connect.output";
     }
 
     @Test
@@ -66,4 +81,39 @@ public class FileStreamSinkTaskTest {
         task.flush(offsets);
         assertEquals("line1" + newLine + "line2" + newLine + "line3" + newLine, os.toString());
     }
+
+    @Test
+    public void testStart() throws IOException {
+        task = new FileStreamSinkTask();
+        Map<String, String> props = new HashMap<>();
+        props.put(FileStreamSinkConnector.FILE_CONFIG, outputFile);
+        task.start(props);
+
+        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        task.put(Arrays.asList(
+                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line0", 1)
+        ));
+        offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
+        task.flush(offsets);
+
+        int numLines = 3;
+        String[] lines = new String[numLines];
+        int i = 0;
+        try (BufferedReader reader = Files.newBufferedReader(Paths.get(outputFile))) {
+            lines[i++] = reader.readLine();
+            task.put(Arrays.asList(
+                    new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1",
2),
+                    new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line2",
1)
+            ));
+            offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
+            offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
+            task.flush(offsets);
+            lines[i++] = reader.readLine();
+            lines[i++] = reader.readLine();
+        }
+
+        while (--i >= 0) {
+            assertEquals("line" + i, lines[i]);
+        }
+    }
 }


Mime
View raw message