kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4335: Add batch.size to FileStreamSource connector to prevent OOM
Date Sat, 06 Jan 2018 02:32:13 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ecf0ab4  KAFKA-4335: Add batch.size to FileStreamSource connector to prevent OOM
ecf0ab4 is described below

commit ecf0ab42c1591be2a3784b6e107567ad883e0330
Author: Study <ph.study@gmail.com>
AuthorDate: Fri Jan 5 18:31:53 2018 -0800

    KAFKA-4335: Add batch.size to FileStreamSource connector to prevent OOM
    
    When the source file of `FileStreamSource` is a large file, `FileStreamSourceTask.poll()`
will result in OOM. This pull request added `batch.size` parameter which can restrict the
poll size.
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Study <ph.study@gmail.com>
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4356 from phstudy/KAFKA-4335
---
 .../connect/file/FileStreamSourceConnector.java    | 16 ++++++++++++++-
 .../kafka/connect/file/FileStreamSourceTask.java   | 13 ++++++++++++
 .../connect/file/FileStreamSourceTaskTest.java     | 24 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 335fe92..59006da 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -36,13 +36,18 @@ import java.util.Map;
 public class FileStreamSourceConnector extends SourceConnector {
     public static final String TOPIC_CONFIG = "topic";
     public static final String FILE_CONFIG = "file";
+    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
+
+    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not
specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to")
+        .define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum number of
records the Source task can read from file one time");
 
     private String filename;
     private String topic;
+    private int batchSize = DEFAULT_TASK_BATCH_SIZE;
 
     @Override
     public String version() {
@@ -57,6 +62,14 @@ public class FileStreamSourceConnector extends SourceConnector {
             throw new ConnectException("FileStreamSourceConnector configuration must include
'topic' setting");
         if (topic.contains(","))
             throw new ConnectException("FileStreamSourceConnector should only have a single
topic when used as a source.");
+
+        if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) {
+            try {
+                batchSize = Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG));
+            } catch (NumberFormatException e) {
+                throw new ConnectException("Invalid FileStreamSourceConnector configuration",
e);
+            }
+        }
     }
 
     @Override
@@ -72,6 +85,7 @@ public class FileStreamSourceConnector extends SourceConnector {
         if (filename != null)
             config.put(FILE_CONFIG, filename);
         config.put(TOPIC_CONFIG, topic);
+        config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
         configs.add(config);
         return configs;
     }
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 8edf385..482102f 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -50,6 +50,7 @@ public class FileStreamSourceTask extends SourceTask {
     private char[] buffer = new char[1024];
     private int offset = 0;
     private String topic = null;
+    private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
 
     private Long streamOffset;
 
@@ -70,6 +71,14 @@ public class FileStreamSourceTask extends SourceTask {
         topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
         if (topic == null)
             throw new ConnectException("FileStreamSourceTask config missing topic setting");
+
+        if (props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) {
+            try {
+                batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
+            } catch (NumberFormatException e) {
+                throw new ConnectException("Invalid FileStreamSourceTask configuration",
e);
+            }
+        }
     }
 
     @Override
@@ -146,6 +155,10 @@ public class FileStreamSourceTask extends SourceTask {
                                 records = new ArrayList<>();
                             records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset),
topic, null,
                                     null, null, VALUE_SCHEMA, line, System.currentTimeMillis()));
+
+                            if (records.size() >= batchSize) {
+                                return records;
+                            }
                         }
                     } while (line != null);
                 }
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index cde6c43..3cb7128 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -127,6 +127,30 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
         task.stop();
     }
 
+    @Test
+    public void testBatchSize() throws IOException, InterruptedException {
+        expectOffsetLookupReturnNone();
+        replay();
+
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "5000");
+        task.start(config);
+
+        FileOutputStream os = new FileOutputStream(tempFile);
+        for (int i = 0; i < 10_000; i++) {
+            os.write("Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur,
adipisci velit...\n".getBytes());
+        }
+        os.flush();
+
+        List<SourceRecord> records = task.poll();
+        assertEquals(5000, records.size());
+
+        records = task.poll();
+        assertEquals(5000, records.size());
+
+        os.close();
+        task.stop();
+    }
+
     @Test(expected = ConnectException.class)
     public void testMissingTopic() throws InterruptedException {
         replay();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message