kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: close iterator on doc example
Date Thu, 07 Sep 2017 05:12:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9b85cf9ed -> b041c8d87


MINOR: close iterator on doc example

The iterator interface usage has some examples missing explicit close operation after usage.
We should remind the user to do so because un-closed iterator will leave the underlying file
descriptor open, thus eating up memory.
guozhangwang Ishiihara

Author: Boyang Chen <bychen@pinterest.com>
Author: Boyang Chen <bchen11@outlook.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3714 from abbccdda/add_iterator_close_on_doc

minor fixes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b041c8d8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b041c8d8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b041c8d8

Branch: refs/heads/trunk
Commit: b041c8d87deab536b262e6260334b5bda1d896c8
Parents: 9b85cf9
Author: Boyang Chen <bychen@pinterest.com>
Authored: Wed Sep 6 22:04:37 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 6 22:12:10 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b041c8d8/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 99467a7..4da50a2 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -125,6 +125,9 @@
                 context.forward(entry.key, entry.value.toString());
             }
 
+            // it is the caller's responsibility to close the iterator on state store;
+            // otherwise it may lead to memory and file handlers leak depending on the
+            // underlying state store implementation.
             iter.close();
 
             // commit the current processing progress
@@ -152,6 +155,20 @@
     }
 
     @Override
+    public void punctuate(long timestamp) {
+    KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
+
+    while (iter.hasNext()) {
+        KeyValue&lt;String, Long&gt; entry = iter.next();
+        context.forward(entry.key, entry.value.toString());
+    }
+
+    iter.close(); // avoid OOM
+    // commit the current processing progress
+    context.commit();
+    }
+
+    @Override
     public void close() {
     // close any resources managed by this processor.
     // Note: Do not close any StateStores as these are managed
@@ -2107,6 +2124,8 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         Kafka Streams materializes one state store per stream partition, which means your
application will potentially manage many underlying state stores.
         The API to query local state stores enables you to query all of the underlying stores
without having to know which partition the data is in.
         The objects returned from <code>KafkaStreams#store(...)</code> are therefore
wrapping potentially many underlying state stores.
+        Note that it is the caller's responsibility to close the iterator on state store;
+        otherwise it may lead to OOM and leaked file handlers depending on the state store
implementation.
     </p>
 
     <h4><a id="streams_developer-guide_interactive-queries_local-key-value-stores"
href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying local
key-value stores</a></h4>
@@ -2152,6 +2171,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
             KeyValue&lt;String, Long&gt; next = range.next();
             System.out.println("count for " + next.key + ": " + value);
           }
+          range.close(); // close iterator to avoid memory leak
 
           // Get the values for all of the keys available in this application instance
           KeyValueIterator&lt;String, Long&gt; range = keyValueStore.all();
@@ -2159,6 +2179,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
             KeyValue&lt;String, Long&gt; next = range.next();
             System.out.println("count for " + next.key + ": " + value);
           }
+          range.close(); // close iterator to avoid memory leak
         </pre>
 
     <h4><a id="streams_developer-guide_interactive-queries_local-window-stores"
href="#streams_developer-guide_interactive-queries_local-window-stores">Querying local
window stores</a></h4>
@@ -2204,6 +2225,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
             long windowTimestamp = next.key;
             System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
           }
+          iterator.close();
         </pre>
 
     <h4><a id="streams_developer-guide_interactive-queries_custom-stores" href="#streams_developer-guide_interactive-queries_custom-stores">Querying
local custom state stores</a></h4>


Mime
View raw message