kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Add AUTO_OFFSET_RESET_CONFIG to StreamsConfig; remove TOTAL_RECORDS_TO_PROCESS from StreamsConfig
Date Tue, 01 Mar 2016 00:05:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 845c6eae1 -> 5da935ef7


MINOR: Add AUTO_OFFSET_RESET_CONFIG to StreamsConfig; remove TOTAL_RECORDS_TO_PROCESS from
StreamsConfig

and remove TOTAL_RECORDS_TO_PROCESS
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #985 from ymatsuda/config_params


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5da935ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5da935ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5da935ef

Branch: refs/heads/trunk
Commit: 5da935ef78ecf4a671bddc632cfd36ef65bd6f6d
Parents: 845c6ea
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Feb 29 16:05:46 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Feb 29 16:05:46 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java   |  2 +-
 .../examples/pageview/PageViewTypedJob.java      |  2 +-
 .../examples/pageview/PageViewUnTypedJob.java    |  2 +-
 .../kafka/streams/examples/pipe/PipeJob.java     |  2 +-
 .../streams/examples/wordcount/WordCountJob.java |  2 +-
 .../wordcount/WordCountProcessorJob.java         |  2 +-
 .../org/apache/kafka/streams/StreamsConfig.java  | 19 ++++++++++---------
 .../processor/internals/StreamThread.java        |  7 -------
 8 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 356f0aa..bd9efc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -82,7 +82,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>auto.offset.reset</code>
      */
     public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
-    private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial
offset in Kafka or if the current offset does not exist any more on the server (e.g. because
that data has been deleted): <ul><li>earliest: automatically reset the offset
to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none:
throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything
else: throw exception to the consumer.</li></ul>";
+    public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial
offset in Kafka or if the current offset does not exist any more on the server (e.g. because
that data has been deleted): <ul><li>earliest: automatically reset the offset
to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none:
throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything
else: throw exception to the consumer.</li></ul>";
 
     /**
      * <code>fetch.min.bytes</code>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 3f9b283..f7266e3 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -93,7 +93,7 @@ public class PageViewTypedJob {
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 065f5f5..3241b8f 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -69,7 +69,7 @@ public class PageViewUntypedJob {
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
index 9e737ba..79649d1 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -48,7 +48,7 @@ public class PipeJob {
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index da6b095..965eb79 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -60,7 +60,7 @@ public class WordCountJob {
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
index 61e8335..f5dd775 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
@@ -115,7 +115,7 @@ public class WordCountProcessorJob {
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the
same pre-loaded data
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         TopologyBuilder builder = new TopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 71d1a6a..65ec969 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 public class StreamsConfig extends AbstractConfig {
 
@@ -70,10 +71,6 @@ public class StreamsConfig extends AbstractConfig {
     public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
     private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds
to wait before deleting state when a partition has migrated.";
 
-    /** <code>total.records.to.process</code> */
-    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
-    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
     /** <code>timestamp.extractor</code> */
     public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
     private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class
that implements the <code>TimestampExtractor</code> interface.";
@@ -117,6 +114,9 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
+    /** <code>auto.offset.reset</code> */
+    public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+
     private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
 
     static {
@@ -204,11 +204,12 @@ public class StreamsConfig extends AbstractConfig {
                                         60000,
                                         Importance.LOW,
                                         STATE_CLEANUP_DELAY_MS_DOC)
-                                .define(TOTAL_RECORDS_TO_PROCESS,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.LOW,
-                                        TOTAL_RECORDS_TO_DOC)
+                                .define(AUTO_OFFSET_RESET_CONFIG,
+                                        Type.STRING,
+                                        "latest",
+                                        in("latest", "earliest", "none"),
+                                        Importance.MEDIUM,
+                                        ConsumerConfig.AUTO_OFFSET_RESET_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
                                         Type.LIST,
                                         "",

http://git-wip-us.apache.org/repos/asf/kafka/blob/5da935ef/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e35eb89..b8ff135 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -97,7 +97,6 @@ public class StreamThread extends Thread {
     private final long pollTimeMs;
     private final long cleanTimeMs;
     private final long commitTimeMs;
-    private final long totalRecordsToProcess;
     private final StreamsMetricsImpl sensors;
 
     private StreamPartitionAssignor partitionAssignor = null;
@@ -202,7 +201,6 @@ public class StreamThread extends Thread {
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
-        this.totalRecordsToProcess = config.getLong(StreamsConfig.TOTAL_RECORDS_TO_PROCESS);
 
         this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition
assignment
         this.lastCommit = time.milliseconds();
@@ -428,11 +426,6 @@ public class StreamThread extends Thread {
             return false;
         }
 
-        if (totalRecordsToProcess >= 0 && recordsProcessed >= totalRecordsToProcess)
{
-            log.debug("Shutting down as we've reached the user configured limit of {} records
to process.", totalRecordsToProcess);
-            return false;
-        }
-
         return true;
     }
 


Mime
View raw message