kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3439; Added exceptions thrown
Date Tue, 12 Apr 2016 20:01:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 319c6e719 -> ba9456de2


KAFKA-3439; Added exceptions thrown

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang

Closes #1213 from enothereska/KAFKA-3439-throws


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba9456de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba9456de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba9456de

Branch: refs/heads/trunk
Commit: ba9456de2ebf1a34bdf5f6f62a701875822e1923
Parents: 319c6e7
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Tue Apr 12 13:00:54 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Apr 12 13:00:54 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  2 ++
 .../kstream/internals/AbstractStream.java       |  3 ++
 .../kstream/internals/ChangedSerializer.java    |  4 +++
 .../kstream/internals/KStreamAggregate.java     |  3 ++
 .../streams/kstream/internals/KStreamImpl.java  |  3 ++
 .../kstream/internals/KStreamJoinWindow.java    |  3 ++
 .../kstream/internals/KStreamKStreamJoin.java   |  3 ++
 .../kstream/internals/KStreamReduce.java        |  3 ++
 .../kstream/internals/KTableAggregate.java      |  3 ++
 .../streams/kstream/internals/KTableImpl.java   |  3 ++
 .../kstream/internals/KTableKTableJoin.java     |  3 ++
 .../kstream/internals/KTableKTableLeftJoin.java |  3 ++
 .../internals/KTableKTableOuterJoin.java        |  3 ++
 .../internals/KTableKTableRightJoin.java        |  3 ++
 .../streams/kstream/internals/KTableReduce.java |  3 ++
 .../kstream/internals/KTableRepartitionMap.java |  6 ++++
 .../processor/DefaultPartitionGrouper.java      |  3 ++
 .../apache/kafka/streams/processor/TaskId.java  |  9 ++++++
 .../streams/processor/TopologyBuilder.java      |  5 +++
 .../processor/internals/AbstractTask.java       |  6 ++++
 .../internals/InternalTopicManager.java         |  6 ++++
 .../internals/MinTimestampTracker.java          |  3 ++
 .../processor/internals/PartitionGroup.java     |  3 ++
 .../internals/ProcessorContextImpl.java         | 18 +++++++++++
 .../internals/ProcessorStateManager.java        | 14 +++++++++
 .../streams/processor/internals/QuickUnion.java |  3 ++
 .../streams/processor/internals/SinkNode.java   |  3 ++
 .../processor/internals/StandbyContextImpl.java | 33 ++++++++++++++++++++
 .../internals/StreamPartitionAssignor.java      |  4 +++
 .../streams/processor/internals/StreamTask.java |  7 +++++
 .../processor/internals/StreamThread.java       |  5 +++
 .../internals/assignment/AssignmentInfo.java    |  7 +++++
 .../internals/assignment/SubscriptionInfo.java  |  6 ++++
 .../apache/kafka/streams/state/StateSerdes.java |  1 +
 .../org/apache/kafka/streams/state/Stores.java  |  4 +++
 .../streams/state/internals/MemoryLRUCache.java |  6 ++++
 .../state/internals/OffsetCheckpoint.java       | 23 ++++++++++++++
 .../streams/state/internals/RocksDBStore.java   |  9 ++++++
 .../state/internals/RocksDBWindowStore.java     |  3 ++
 39 files changed, 232 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index e8fda10..4d1306d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -138,6 +138,7 @@ public class KafkaStreams {
 
     /**
      * Start the stream instance by starting all its threads.
+     * @throws IllegalStateException if process was already started
      */
     public synchronized void start() {
         log.debug("Starting Kafka Stream process");
@@ -157,6 +158,7 @@ public class KafkaStreams {
     /**
      * Shutdown this stream instance by signaling all the threads to stop,
      * and then wait for them to join.
+     * @throws IllegalStateException if process has not started yet
      */
     public synchronized void close() {
         log.debug("Stopping Kafka Stream process");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index c537465..ebada92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -36,6 +36,9 @@ public abstract class AbstractStream<K> {
         this.sourceNodes = sourceNodes;
     }
 
+    /**
+     * @throws TopologyBuilderException if the streams are not joinable
+     */
     protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
         Set<String> thisSourceNodes = sourceNodes;
         Set<String> otherSourceNodes = other.sourceNodes;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 5ea0791..5dbbac9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -38,6 +38,10 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
         // do nothing
     }
 
+    /**
+     * @throws StreamsException if both old and new values of data are null, or if
+     * both values are not null
+     */
     @Override
     public byte[] serialize(String topic, Change<T> data) {
         byte[] serializedKey;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 871a12d..b6d1492 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -61,6 +61,9 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, V value) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 97a7aac..7030021 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -155,6 +155,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, null, null);
     }
 
+    /**
+     * @throws TopologyBuilderException if file is not found
+     */
     @Override
     public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
         String name = topology.newName(PRINTING_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 94e0b88..864dc9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -28,6 +28,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
 
     private final String windowName;
 
+    /**
+     * @throws TopologyBuilderException if retention period of the join window is less than expected
+     */
     KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
         this.windowName = windowName;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d8caf3c..d13d112 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -62,6 +62,9 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
             otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, V1 value) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index e37fe34..ed6e216 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -58,6 +58,9 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, V value) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 806c6e9..4a7c7c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -63,6 +63,9 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V> value) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ee2c931..f78169e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -157,6 +157,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         writeAsText(filePath, null, null);
     }
 
+    /**
+     * @throws TopologyBuilderException if file is not found
+     */
     @Override
     public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
         String name = topology.newName(PRINTING_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 24c8da6..36424d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -60,6 +60,9 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
             valueGetter.init(context);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V1> change) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 4bf45ed..996ebc3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -60,6 +60,9 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
             valueGetter.init(context);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V1> change) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 49eed53..2a0d477 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -60,6 +60,9 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
             valueGetter.init(context);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V1> change) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 7443d4a..fa41ed3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -61,6 +61,9 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
             valueGetter.init(context);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V1> change) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index d56b3ae..bab6bf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -60,6 +60,9 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V> value) {
             // the keys should never be null

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 142a279..2a7cf1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -57,6 +57,9 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
         };
     }
 
+    /**
+     * @throws IllegalStateException since this method should never be called
+     */
     @Override
     public void enableSendingOldValues() {
         // this should never be called
@@ -74,6 +77,9 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
 
     private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
 
+        /**
+         * @throws StreamsException if key is null
+         */
         @Override
         public void process(K key, Change<V> change) {
             KeyValue<K1, V1> newPair = computeValue(key, change.newValue);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 999f6a9..405ecd5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -69,6 +69,9 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
         return Collections.unmodifiableMap(groups);
     }
 
+    /**
+     * @throws StreamsException if no metadata can be received for a topic
+     */
     protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
         int maxNumPartitions = 0;
         for (String topic : topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index ff21047..fa7c73c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -41,6 +41,9 @@ public class TaskId implements Comparable<TaskId> {
         return topicGroupId + "_" + partition;
     }
 
+    /**
+     * @throws TaskIdFormatException if the string is not a valid TaskId
+     */
     public static TaskId parse(String string) {
         int index = string.indexOf('_');
         if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(string);
@@ -55,11 +58,17 @@ public class TaskId implements Comparable<TaskId> {
         }
     }
 
+    /**
+     * @throws IOException if cannot write to output stream
+     */
     public void writeTo(DataOutputStream out) throws IOException {
         out.writeInt(topicGroupId);
         out.writeInt(partition);
     }
 
+    /**
+     * @throws IOException if cannot read from input stream
+     */
     public static TaskId readFrom(DataInputStream in) throws IOException {
         return new TaskId(in.readInt(), in.readInt());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7f5d645..487d5fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -221,6 +221,7 @@ public class TopologyBuilder {
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
+     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
     public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
         if (nodeFactories.containsKey(name))
@@ -328,6 +329,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
      */
     public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
@@ -359,6 +361,7 @@ public class TopologyBuilder {
      * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
      * and process
      * @return this builder instance so methods can be chained together; never null
+     * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
      */
     public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
         if (nodeFactories.containsKey(name))
@@ -386,6 +389,7 @@ public class TopologyBuilder {
      *
      * @param supplier the supplier used to obtain this state store {@link StateStore} instance
      * @return this builder instance so methods can be chained together; never null
+     * @throws TopologyBuilderException if state store supplier is already added
      */
     public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
         if (stateFactories.containsKey(supplier.name())) {
@@ -438,6 +442,7 @@ public class TopologyBuilder {
      *
      * @param processorNames the name of the processors
      * @return this builder instance so methods can be chained together; never null
+     * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
      */
     public final TopologyBuilder connectProcessors(String... processorNames) {
         if (processorNames.length < 2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b3b6537..c85ecde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -44,6 +44,9 @@ public abstract class AbstractTask {
     protected final Set<TopicPartition> partitions;
     protected ProcessorContext processorContext;
 
+    /**
+     * @throws ProcessorStateException if the state manager cannot be created
+     */
     protected AbstractTask(TaskId id,
                            String applicationId,
                            Collection<TopicPartition> partitions,
@@ -101,6 +104,9 @@ public abstract class AbstractTask {
 
     public abstract void commit();
 
+    /**
+     * @throws ProcessorStateException if there is an error while closing the state manager
+     */
     public void close() {
         try {
             stateMgr.close(recordCollectorOffsets());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 3725c4c..536a447 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -51,6 +51,9 @@ public class InternalTopicManager {
 
     private class ZKStringSerializer implements ZkSerializer {
 
+        /**
+         * @throws AssertionError if the byte String encoding type is not supported
+         */
         @Override
         public byte[] serialize(Object data) {
             try {
@@ -60,6 +63,9 @@ public class InternalTopicManager {
             }
         }
 
+        /**
+         * @throws AssertionError if the byte String encoding type is not supported
+         */
         @Override
         public Object deserialize(byte[] bytes) {
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index 717df2c..655b8b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -32,6 +32,9 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
     // record's timestamp
     private long lastKnownTime = NOT_KNOWN;
 
+    /**
+     * @throws NullPointerException if the element is null
+     */
     public void addElement(Stamped<E> elem) {
         if (elem == null) throw new NullPointerException();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index ec89d47..8c6078a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -145,6 +145,9 @@ public class PartitionGroup {
         return timestamp;
     }
 
+    /**
+     * @throws IllegalStateException if the record's partition does not belong to this partition group
+     */
     public int numBuffered(TopicPartition partition) {
         RecordQueue recordQueue = partitionQueues.get(partition);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 1c398ac..10e7d68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -105,6 +105,9 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         return metrics;
     }
 
+    /**
+     * @throws IllegalStateException if this method is called before {@link #initialized()}
+     */
     @Override
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
@@ -113,6 +116,9 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         stateMgr.register(store, loggingEnabled, stateRestoreCallback);
     }
 
+    /**
+     * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
+     */
     @Override
     public StateStore getStateStore(String name) {
         ProcessorNode node = task.node();
@@ -127,6 +133,9 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         return stateMgr.getStore(name);
     }
 
+    /**
+     * @throws IllegalStateException if the task's record is null
+     */
     @Override
     public String topic() {
         if (task.record() == null)
@@ -140,6 +149,9 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
             return topic;
     }
 
+    /**
+     * @throws IllegalStateException if the task's record is null
+     */
     @Override
     public int partition() {
         if (task.record() == null)
@@ -148,6 +160,9 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         return task.record().partition();
     }
 
+    /**
+     * @throws IllegalStateException if the task's record is null
+     */
     @Override
     public long offset() {
         if (this.task.record() == null)
@@ -156,6 +171,9 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         return this.task.record().offset();
     }
 
+    /**
+     * @throws IllegalStateException if the task's record is null
+     */
     @Override
     public long timestamp() {
         if (task.record() == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 665d39f..003b988 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -67,6 +67,9 @@ public class ProcessorStateManager {
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
 
+    /**
+     * @throws IOException if any error happens while creating or locking the state directory
+     */
     public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
         this.applicationId = applicationId;
         this.defaultPartition = defaultPartition;
@@ -110,6 +113,9 @@ public class ProcessorStateManager {
         return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
     }
 
+    /**
+     * @throws IOException if any error happens when locking the state directory
+     */
     public static FileLock lockStateDirectory(File stateDir) throws IOException {
         return lockStateDirectory(stateDir, 0);
     }
@@ -143,6 +149,11 @@ public class ProcessorStateManager {
         return this.baseDir;
     }
 
+    /**
+     * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
+     * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (store.name().equals(CHECKPOINT_FILE_NAME))
             throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
@@ -313,6 +324,9 @@ public class ProcessorStateManager {
         }
     }
 
+    /**
+     * @throws IOException if any error happens when flushing or closing the state stores
+     */
     public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
         try {
             if (!stores.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
index 087cbd2..4e789fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
@@ -32,6 +32,9 @@ public class QuickUnion<T> {
         return ids.containsKey(id);
     }
 
+    /**
+     * @throws NoSuchElementException if the parent of this node is null
+     */
     public T root(T id) {
         T current = id;
         T parent = ids.get(current);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 31a558b..e9c2760 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -40,6 +40,9 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         this.partitioner = partitioner;
     }
 
+    /**
+     * @throws UnsupportedOperationException if this method adds a child to a sink node
+     */
     @Override
     public void addChild(ProcessorNode<?, ?> child) {
         throw new UnsupportedOperationException("sink node does not allow addChild");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 468fe74..d4b47e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -98,6 +98,9 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
         return metrics;
     }
 
+    /**
+     * @throws IllegalStateException
+     */
     @Override
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
@@ -106,51 +109,81 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
         stateMgr.register(store, loggingEnabled, stateRestoreCallback);
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public StateStore getStateStore(String name) {
         throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public String topic() {
         throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public int partition() {
         throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public long offset() {
         throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public long timestamp() {
         throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public <K, V> void forward(K key, V value) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public <K, V> void forward(K key, V value, String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public void commit() {
         throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public void schedule(long interval) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1dd082d..bc42c82 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -96,6 +96,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
      * since the former needs later's cached metadata while sending subscriptions,
      * and the latter needs former's returned assignment when adding tasks.
+     * @throws KafkaException if the stream thread is not specified
      */
     @Override
     public void configure(Map<String, ?> configs) {
@@ -382,6 +383,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         return assignment;
     }
 
+    /**
+     * @throws TaskAssignmentException if there is no task id for one of the partitions specified
+     */
     @Override
     public void onAssignment(Assignment assignment) {
         List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 53d0a8d..d9efb6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -212,6 +212,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return punctuationQueue.mayPunctuate(timestamp, this);
     }
 
+    /**
+     * @throws IllegalStateException if the current node is not null
+     */
     @Override
     public void punctuate(ProcessorNode node, long timestamp) {
         if (currNode != null)
@@ -280,6 +283,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * Schedules a punctuation for the processor
      *
      * @param interval  the interval in milliseconds
+     * @throws IllegalStateException if the current node is not null
      */
     public void schedule(long interval) {
         if (currNode == null)
@@ -288,6 +292,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
     }
 
+    /**
+     * @throws RuntimeException if an error happens during closing of processor nodes
+     */
     @Override
     public void close() {
         this.partitionGroup.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 38dc356..f02683e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -239,6 +239,8 @@ public class StreamThread extends Thread {
 
     /**
      * Execute the stream processors
+     * @throws KafkaException for any Kafka-related exceptions
+     * @throws Exception for any other non-Kafka exceptions
      */
     @Override
     public void run() {
@@ -760,6 +762,9 @@ public class StreamThread extends Thread {
             sensor.record((endNs - startNs) / 1000000, endNs);
         }
 
+        /**
+         * @throws IllegalArgumentException if tags is not constructed in key-value pairs
+         */
         @Override
         public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
             // extract the additional tags if there are any

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index c2175bb..0486e57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -54,6 +54,10 @@ public class AssignmentInfo {
         this.standbyTasks = standbyTasks;
     }
 
+    /**
+     * @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an
+     * IO exception during encoding
+     */
     public ByteBuffer encode() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream out = new DataOutputStream(baos);
@@ -96,6 +100,9 @@ public class AssignmentInfo {
         }
     }
 
+    /**
+     * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
+     */
     public static AssignmentInfo decode(ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index ccd2f73..874fea8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -49,6 +49,9 @@ public class SubscriptionInfo {
         this.standbyTasks = standbyTasks;
     }
 
+    /**
+     * @throws TaskAssignmentException if method fails to encode the data
+     */
     public ByteBuffer encode() {
         if (version == CURRENT_VERSION) {
             ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
@@ -78,6 +81,9 @@ public class SubscriptionInfo {
         }
     }
 
+    /**
+     * @throws TaskAssignmentException if method fails to decode the data
+     */
     public static SubscriptionInfo decode(ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 9daac98..933bf72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -46,6 +46,7 @@ public final class StateSerdes<K, V> {
      * @param stateName     the name of the state
      * @param keySerde      the serde for keys; cannot be null
      * @param valueSerde    the serde for values; cannot be null
+     * @throws IllegalArgumentException if key or value serde is null
      */
     @SuppressWarnings("unchecked")
     public StateSerdes(String stateName,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 4e28187..9f1e53c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -50,6 +50,10 @@ public class Stores {
                                 return new InMemoryKeyValueFactory<K, V>() {
                                     private int capacity = Integer.MAX_VALUE;
 
+                                    /**
+                                     * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
+                                     * @throws IllegalArgumentException if the capacity of the store is zero or negative
+                                     */
                                     @Override
                                     public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) {
                                         if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index a859bd2..76dd744 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -124,11 +124,17 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         return value;
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
         throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
     }
 
+    /**
+     * @throws UnsupportedOperationException
+     */
     @Override
     public KeyValueIterator<K, V> all() {
         throw new UnsupportedOperationException("MemoryLRUCache does not support all() function.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 853fc5d..ff17e68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -55,11 +55,17 @@ public class OffsetCheckpoint {
     private final File file;
     private final Object lock;
 
+    /**
+     * @throws IOException
+     */
     public OffsetCheckpoint(File file) throws IOException {
         this.file = file;
         this.lock = new Object();
     }
 
+    /**
+     * @throws IOException if any file operation fails with an IO exception
+     */
     public void write(Map<TopicPartition, Long> offsets) throws IOException {
         synchronized (lock) {
             // write to temp file and then swap with the existing file
@@ -84,11 +90,17 @@ public class OffsetCheckpoint {
         }
     }
 
+    /**
+     * @throws IOException if file write operations failed with any IO exception
+     */
     private void writeIntLine(BufferedWriter writer, int number) throws IOException {
         writer.write(Integer.toString(number));
         writer.newLine();
     }
 
+    /**
+     * @throws IOException if file write operations failed with any IO exception
+     */
     private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
         writer.write(part.topic());
         writer.write(' ');
@@ -98,6 +110,11 @@ public class OffsetCheckpoint {
         writer.newLine();
     }
 
+
+    /**
+     * @throws IOException if any file operation fails with an IO exception
+     * @throws IllegalArgumentException if the offset checkpoint version is unknown
+     */
     public Map<TopicPartition, Long> read() throws IOException {
         synchronized (lock) {
             BufferedReader reader;
@@ -141,6 +158,9 @@ public class OffsetCheckpoint {
         }
     }
 
+    /**
+     * @throws IOException if file read ended prematurely
+     */
     private int readInt(BufferedReader reader) throws IOException {
         String line = reader.readLine();
         if (line == null)
@@ -148,6 +168,9 @@ public class OffsetCheckpoint {
         return Integer.parseInt(line);
     }
 
+    /**
+     * @throws IOException if there is any IO exception during delete
+     */
     public void delete() throws IOException {
         file.delete();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index fe327f6..944d408 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -397,6 +397,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         flushInternal();
     }
 
+    /**
+     * @throws ProcessorStateException if flushing failed because of any internal store exceptions
+     */
     public void flushInternal() {
         try {
             db.flush(fOptions);
@@ -433,6 +436,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return iter.isValid();
         }
 
+        /**
+         * @throws NoSuchElementException if no next element exist
+         */
         @Override
         public KeyValue<K, V> next() {
             if (!hasNext())
@@ -443,6 +449,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return entry;
         }
 
+        /**
+         * @throws UnsupportedOperationException
+         */
         @Override
         public void remove() {
             throw new UnsupportedOperationException("RocksDB iterator does not support remove");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba9456de/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 9851c04..5955d21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -86,6 +86,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             return false;
         }
 
+        /**
+         * @throws NoSuchElementException if no next element exists
+         */
         @Override
         public KeyValue<Long, V> next() {
             if (index >= iterators.length)


Mime
View raw message