kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5985; update javadoc regarding closing iterators
Date Mon, 02 Oct 2017 18:49:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4f4f99532 -> 39d5cdccc


KAFKA-5985; update javadoc regarding closing iterators

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Michael G. Noll <michael@confluent.io>,
Damian Guy <damian.guy@gmail.com>

Closes #3994 from bbejeck/KAFKA-5985_document_need_to_close_iterators


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39d5cdcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39d5cdcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39d5cdcc

Branch: refs/heads/trunk
Commit: 39d5cdcccfc0f7d7893188bb22580da0c842a993
Parents: 4f4f995
Author: Bill Bejeck <bill@confluent.io>
Authored: Mon Oct 2 11:49:22 2017 -0700
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Oct 2 11:49:22 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html                            | 8 ++++++++
 .../org/apache/kafka/streams/state/KeyValueIterator.java     | 2 +-
 .../apache/kafka/streams/state/ReadOnlyKeyValueStore.java    | 4 ++--
 .../org/apache/kafka/streams/state/ReadOnlySessionStore.java | 6 ++++--
 .../org/apache/kafka/streams/state/ReadOnlyWindowStore.java  | 4 ++++
 .../java/org/apache/kafka/streams/state/SessionStore.java    | 4 ++++
 .../org/apache/kafka/streams/state/WindowStoreIterator.java  | 2 +-
 7 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 3368757..a064a5d 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -185,6 +185,7 @@
         <li>In the <code>init</code> method, schedule the punctuation every
1 second and retrieve the local state store by its name "Counts".</li>
         <li>In the <code>process</code> method, upon each received record,
split the value string into words, and update their counts into the state store (we will talk
about this feature later in the section).</li>
         <li>In the scheduled <code>punctuate</code> method, iterate the
local state store and send the aggregated counts to the downstream processor, and commit the
current stream state.</li>
+        <li>When done with the <code>KeyValueIterator&lt;String, Long&gt;</code>
you <em>must</em> close the iterator, as shown above or use the try-with-resources
statement.</li>
     </ul>
 
 
@@ -253,6 +254,13 @@ With deletion enabled, old windows that have expired will be cleaned
up by Kafka
 The default retention setting is <code>Windows#maintainMs()</code> + 1 day. This
setting can be overriden by specifying <code>StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</code>
in the <code>StreamsConfig</code>.
 </p>
 
+<p>
+One additional note regarding the use of state stores.  Any time you open an <code>Iterator</code>
from a state store you <em>must</em> call <code>close()</code> on
the iterator
+when you are done working with it to reclaim resources.  Or you can use the iterator from
within a try-with-resources statement.
+    By not closing an iterator, you may likely encounter an OOM error.
+</p>
+
+
 <h4><a id="restoration_progress" href="#restoration_progress">Monitoring the
Restoration Progress of Fault-tolerant State Stores</a></h4>
 
 <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index 3f44635..70a142b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 /**
  * Iterator interface of {@link KeyValue}.
  *
- * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * Users must call its {@code close} method explicitly upon completeness to release resources,
  * or use try-with-resources statement (available since JDK7) for this {@link Closeable}
class.
  *
  * @param <K> Type of keys

http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
index 76bb47b..0632980 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
@@ -39,7 +39,7 @@ public interface ReadOnlyKeyValueStore<K, V> {
     V get(K key);
 
     /**
-     * Get an iterator over a given range of keys. This iterator MUST be closed after use.
+     * Get an iterator over a given range of keys. This iterator must be closed after use.
      * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
      * and must not return null values. No ordering guarantees are provided.
      * @param from The first key that could be in the range
@@ -51,7 +51,7 @@ public interface ReadOnlyKeyValueStore<K, V> {
     KeyValueIterator<K, V> range(K from, K to);
 
     /**
-     * Return an iterator over all keys in this store. This iterator MUST be closed after
use.
+     * Return an iterator over all keys in this store. This iterator must be closed after
use.
      * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
      * and must not return null values. No ordering guarantees are provided.
      * @return An iterator of all key/value pairs in the store.

http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
index e3859fd..ce88b9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
@@ -30,7 +30,8 @@ import org.apache.kafka.streams.kstream.Windowed;
 public interface ReadOnlySessionStore<K, AGG> {
 
     /**
-     * Retrieve all aggregated sessions for the provided key
+     * Retrieve all aggregated sessions for the provided key.
+     * This iterator must be closed after use.
      *
      * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
      * available session to the newest/latest session.
@@ -43,7 +44,8 @@ public interface ReadOnlySessionStore<K, AGG> {
     KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
 
     /**
-     * Retrieve all aggregated sessions for the given range of keys
+     * Retrieve all aggregated sessions for the given range of keys.
+     * This iterator must be closed after use.
      *
      * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
      * available session to the newest/latest session.

http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index 5252cd6..cff1f6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -31,6 +31,8 @@ public interface ReadOnlyWindowStore<K, V> {
     /**
      * Get all the key-value pairs with the given key and the time range from all
      * the existing windows.
+     *
+     * This iterator must be closed after use.
      * <p>
      * The time range is inclusive and applies to the starting timestamp of the window.
      * For example, if we have the following windows:
@@ -64,6 +66,8 @@ public interface ReadOnlyWindowStore<K, V> {
      * Get all the key-value pairs in the given key range and time range from all
      * the existing windows.
      *
+     * This iterator must be closed after use.
+     *
      * @param from      the first key in the range
      * @param to        the last key in the range
      * @param timeFrom  time range start (inclusive)

http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index c98f8ab..fcbce5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -30,6 +30,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime
and the sessions
      * start is &le; latestSessionStartTime
      *
+     * This iterator must be closed after use.
+     *
      * @param key the key to return sessions for
      * @param earliestSessionEndTime
      * @return iterator of sessions with the matching key and aggregated values
@@ -41,6 +43,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime
and the sessions
      * start is &le; latestSessionStartTime
      *
+     * This iterator must be closed after use.
+     *
      * @param keyFrom The first key that could be in the range
      * @param keyTo The last key that could be in the range
      * @param earliestSessionEndTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/39d5cdcc/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index fdf0936..c07130e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -23,7 +23,7 @@ import java.io.Closeable;
 /**
  * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object,
long, long)}.
  *
- * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * Users must call its {@code close} method explicitly upon completeness to release resources,
  * or use try-with-resources statement (available since JDK7) for this {@link Closeable}
class.
  *
  * @param <V> Type of values


Mime
View raw message