kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [1/2] kafka-site git commit: MINOR: add Interactive Queries docs
Date Fri, 07 Jul 2017 13:21:55 GMT
Repository: kafka-site
Updated Branches:
  refs/heads/asf-site a6523f9ee -> 422095a8f


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/422095a8/0110/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/0110/streams/developer-guide.html b/0110/streams/developer-guide.html
index 95961f1..6c75172 100644
--- a/0110/streams/developer-guide.html
+++ b/0110/streams/developer-guide.html
@@ -21,13 +21,13 @@
     <h1>Developer Guide</h1>
 
     <p>
-    There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
-    This section focuses on how to write, configure, and execute a Kafka Streams application.
+        There is a <a href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
+        This section focuses on how to write, configure, and execute a Kafka Streams application.
     </p>
 
     <p>
-    As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="/{{version}}/documentation/streams/core-concepts#streams_topology">processor topology</a>.
-    Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
+        As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="/{{version}}/documentation/streams/core-concepts#streams_topology">processor topology</a>.
+        Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
     </p>
 
     <h3><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h3>
@@ -35,23 +35,23 @@
     <h4><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h4>
 
     <p>
-    As mentioned in the <a href="/{{version}}/documentation/streams/core-concepts"><b>Core Concepts</b></a> section, a stream processor is a node in the processor topology that represents a single processing step.
-    With the <code>Processor</code> API developers can define arbitrary stream processors that process one received record at a time, and connect these processors with
-    their associated state stores to compose the processor topology that represents their customized processing logic.
+        As mentioned in the <a href="/{{version}}/documentation/streams/core-concepts"><b>Core Concepts</b></a> section, a stream processor is a node in the processor topology that represents a single processing step.
+        With the <code>Processor</code> API developers can define arbitrary stream processors that process one received record at a time, and connect these processors with
+        their associated state stores to compose the processor topology that represents their customized processing logic.
     </p>
 
     <p>
-    The <code>Processor</code> interface provides two main API methods:
-    <code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each
-    of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
-    In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
-    <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
-    forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
-    processing progress (<code>context().commit</code>), etc.
+        The <code>Processor</code> interface provides two main API methods:
+        <code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each
+        of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
+        In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
+        <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
+        forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
+        processing progress (<code>context().commit</code>), etc.
     </p>
 
     <p>
-    The following example <code>Processor</code> implementation defines a simple word-count algorithm:
+        The following example <code>Processor</code> implementation defines a simple word-count algorithm:
     </p>
 
     <pre class="brush: java;">
@@ -111,7 +111,7 @@
     </pre>
 
     <p>
-    In the above implementation, the following actions are performed:
+        In the above implementation, the following actions are performed:
     </p>
 
     <ul>
@@ -124,8 +124,8 @@
     <h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
 
     <p>
-    With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
-    by connecting these processors together:
+        With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
+        by connecting these processors together:
     </p>
 
     <pre class="brush: java;">
@@ -165,11 +165,11 @@
     <h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State Stores</a></h4>
 
     <p>
-    Note that the <code>Processor</code> API is not limited to only accessing the current records as they arrive in the <code>process()</code> method, but can also maintain processing states
-    that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation.
-    To take advantage of these states, users can define a state store by implementing the <code>StateStore</code> interface (the Kafka Streams library also has a few extended interfaces such as <code>KeyValueStore</code>);
-    in practice, though, users usually do not need to customize such a state store from scratch but can simply use the <code>Stores</code> factory to define a state store by specifying whether it should be persistent, log-backed, etc.
-    In the following example, a persistent key-value store named "Counts" with key type <code>String</code> and value type <code>Long</code> is created.
+        Note that the <code>Processor</code> API is not limited to only accessing the current records as they arrive in the <code>process()</code> method, but can also maintain processing states
+        that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation.
+        To take advantage of these states, users can define a state store by implementing the <code>StateStore</code> interface (the Kafka Streams library also has a few extended interfaces such as <code>KeyValueStore</code>);
+        in practice, though, users usually do not need to customize such a state store from scratch but can simply use the <code>Stores</code> factory to define a state store by specifying whether it should be persistent, log-backed, etc.
+        In the following example, a persistent key-value store named "Counts" with key type <code>String</code> and value type <code>Long</code> is created.
     </p>
 
     <pre class="brush: java;">
@@ -181,9 +181,9 @@
     </pre>
 
     <p>
-    To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
-    processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
-    state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+        To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
+        processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
+        state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
     </p>
 
     <pre class="brush: java;">
@@ -218,35 +218,35 @@
     <h4><a id="streams_duality" href="#streams_duality">Duality of Streams and Tables</a></h4>
 
     <p>
-    Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams:
-    the so-called <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/">stream-table duality</a>.
-    Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka's log compaction feature, for example, exploits this duality.
+        Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams:
+        the so-called <a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/">stream-table duality</a>.
+        Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka's log compaction feature, for example, exploits this duality.
     </p>
 
     <p>
-    A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
+        A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
     </p>
     <img class="centered" src="/{{version}}/images/streams-table-duality-01.png">
 
     The <b>stream-table duality</b> describes the close relationship between streams and tables.
     <ul>
-    <li><b>Stream as Table</b>: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).</li>
-    <li><b>Table as Stream</b>: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table.</li>
+        <li><b>Stream as Table</b>: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).</li>
+        <li><b>Table as Stream</b>: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table.</li>
     </ul>
 
     <p>
-    Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column).
+        Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column).
     </p>
     <img class="centered" src="/{{version}}/images/streams-table-duality-02.png" style="width:300px">
 
     <p>
-    Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
+        Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
     </p>
     <img class="centered" src="/{{version}}/images/streams-table-duality-03.png" style="width:600px">
 
     <p>
-    The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance.
-    The stream-table duality is such an important concept that Kafka Streams models it explicitly via the <a href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a> interfaces, which we describe in the next sections.
+        The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance.
+        The stream-table duality is such an important concept that Kafka Streams models it explicitly via the <a href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a> interfaces, which we describe in the next sections.
     </p>
 
     <h5><a id="streams_kstream_ktable" href="#streams_kstream_ktable">KStream, KTable, and GlobalKTable</a></h5>
@@ -269,9 +269,9 @@
     <h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h4>
 
     <p>
-    Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
-    can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
-    from a single topic).
+        Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code> or <code>GlobalKTable</code>)
+        can be created as a source stream from one or more Kafka topics (for <code>KTable</code> and <code>GlobalKTable</code> you can only create the source stream
+        from a single topic).
     </p>
 
     <pre class="brush: java;">
@@ -285,42 +285,42 @@
     <h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h4>
     A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:
     <ul>
-    <li><b>Hopping time windows</b> are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.</li>
-    <li><b>Tumbling time windows</b> are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.</li>
-    <li><b>Sliding windows</b> model a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the <code>JoinWindows</code> class.</li>
-    <li><b>Session windows</b> are used to aggregate key-based events into sessions.
-        Sessions represent a period of activity separated by a defined gap of inactivity.
-        Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
-        If the event falls outside of the session gap, then a new session will be created.
-        Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes);
-        as such session windows can't be pre-computed and are instead derived from analyzing the timestamps of the data records.
-    </li>
+        <li><b>Hopping time windows</b> are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.</li>
+        <li><b>Tumbling time windows</b> are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.</li>
+        <li><b>Sliding windows</b> model a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the <code>JoinWindows</code> class.</li>
+        <li><b>Session windows</b> are used to aggregate key-based events into sessions.
+            Sessions represent a period of activity separated by a defined gap of inactivity.
+            Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
+            If the event falls outside of the session gap, then a new session will be created.
+            Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes);
+            as such session windows can't be pre-computed and are instead derived from analyzing the timestamps of the data records.
+        </li>
     </ul>
 
     <p>
-    In the Kafka Streams DSL users can specify a <b>retention period</b> for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval.
-    If a record arrives after the retention period has passed, the record cannot be processed and is dropped.
+        In the Kafka Streams DSL users can specify a <b>retention period</b> for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval.
+        If a record arrives after the retention period has passed, the record cannot be processed and is dropped.
     </p>
 
     <p>
-    Late-arriving records are always possible in real-time data streams. However, it depends on the effective <a href="/{{version}}/documentation/streams/core-concepts#streams_time">time semantics</a> how late records are handled. Using processing-time, the semantics are "when the data is being processed",
-    which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving "late") for event-time or ingestion-time semantics. In both cases,
-    Kafka Streams is able to properly handle late-arriving records.
+        Late-arriving records are always possible in real-time data streams. However, it depends on the effective <a href="/{{version}}/documentation/streams/core-concepts#streams_time">time semantics</a> how late records are handled. Using processing-time, the semantics are "when the data is being processed",
+        which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving "late") for event-time or ingestion-time semantics. In both cases,
+        Kafka Streams is able to properly handle late-arriving records.
     </p>
 
     <h4><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h4>
     A <b>join</b> operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
     <ul>
-    <li><b>KStream-to-KStream Joins</b> are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
-
-    <li><b>KTable-to-KTable Joins</b> are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream's materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new <code>KTable</code> instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.</li>
-    <li><b>KStream-to-KTable Joins</b> allow you to perform table lookups against a changelog stream (<code>KTable</code>) upon receiving a new record from another record stream (<code>KStream</code>). An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>KTable</code>). Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
-    <li><b>KStream-to-GlobalKTable Joins</b> allow you to perform table lookups against a fully replicated changelog stream (<code>GlobalKTable</code>) upon receiving a new record from another record stream (<code>KStream</code>).
-        Joins with a <code>GlobalKTable</code> don't require repartitioning of the input <code>KStream</code> as all partitions of the <code>GlobalKTable</code> are available on every KafkaStreams instance.
-        The <code>KeyValueMapper</code> provided with the join operation is applied to each KStream record to extract the join-key that is used to do the lookup to the GlobalKTable so non-record-key joins are possible.
-        An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>GlobalKTable</code>).
-        Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store).
-        A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
+        <li><b>KStream-to-KStream Joins</b> are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
+
+        <li><b>KTable-to-KTable Joins</b> are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream's materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new <code>KTable</code> instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.</li>
+        <li><b>KStream-to-KTable Joins</b> allow you to perform table lookups against a changelog stream (<code>KTable</code>) upon receiving a new record from another record stream (<code>KStream</code>). An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>KTable</code>). Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
+        <li><b>KStream-to-GlobalKTable Joins</b> allow you to perform table lookups against a fully replicated changelog stream (<code>GlobalKTable</code>) upon receiving a new record from another record stream (<code>KStream</code>).
+            Joins with a <code>GlobalKTable</code> don't require repartitioning of the input <code>KStream</code> as all partitions of the <code>GlobalKTable</code> are available on every KafkaStreams instance.
+            The <code>KeyValueMapper</code> provided with the join operation is applied to each KStream record to extract the join-key that is used to do the lookup to the GlobalKTable so non-record-key joins are possible.
+            An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>GlobalKTable</code>).
+            Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store).
+            A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li>
     </ul>
 
     Depending on the operands the following join operations are supported: <b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>.
@@ -330,27 +330,27 @@
     An <b>aggregation</b> operation takes one input stream, and yields a new stream by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. An aggregation over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the aggregation may grow indefinitely.
 
     <p>
-    In the Kafka Streams DSL, an input stream of an aggregation can be a <code>KStream</code> or a <code>KTable</code>, but the output stream will always be a <code>KTable</code>.
-    This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted.
-    When such late arrival happens, the aggregating <code>KStream</code> or <code>KTable</code> simply emits a new aggregate value. Because the output is a <code>KTable</code>, the new value is considered to overwrite the old value with the same key in subsequent processing steps.
+        In the Kafka Streams DSL, an input stream of an aggregation can be a <code>KStream</code> or a <code>KTable</code>, but the output stream will always be a <code>KTable</code>.
+        This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted.
+        When such late arrival happens, the aggregating <code>KStream</code> or <code>KTable</code> simply emits a new aggregate value. Because the output is a <code>KTable</code>, the new value is considered to overwrite the old value with the same key in subsequent processing steps.
     </p>
 
     <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h4>
 
     <p>
-    Besides join and aggregation operations, there is a list of other transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively.
-    Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and
-    can be translated into one or more connected processors into the underlying processor topology.
-    All these transformation methods can be chained together to compose a complex processor topology.
-    Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as
-    generics functions where users could specify the input and output data types.
+        Besides join and aggregation operations, there is a list of other transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively.
+        Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and
+        can be translated into one or more connected processors into the underlying processor topology.
+        All these transformation methods can be chained together to compose a complex processor topology.
+        Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as
+        generics functions where users could specify the input and output data types.
     </p>
 
     <p>
-    Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless
-    transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>,
-    where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>,
-    <code>KeyValueMapper</code> for <code>map</code>, etc:
+        Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless
+        transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>,
+        where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>,
+        <code>KeyValueMapper</code> for <code>map</code>, etc:
 
     </p>
 
@@ -360,12 +360,12 @@
     </pre>
 
     <p>
-    Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
-    they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
-    require accessing an associated state for processing and producing outputs.
-    For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records
-    within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
-    based on them.
+        Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
+        they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
+        require accessing an associated state for processing and producing outputs.
+        For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records
+        within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
+        based on them.
     </p>
 
     <pre class="brush: java;">
@@ -385,8 +385,8 @@
     <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h4>
 
     <p>
-    At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
-    <code>KStream.to</code> and <code>KTable.to</code>.
+        At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
+        <code>KStream.to</code> and <code>KTable.to</code>.
     </p>
 
     <pre class="brush: java;">
@@ -406,13 +406,531 @@
     </pre>
     <br>
 
+    <h3><a id="streams_interactive_queries" href="#streams_interactive_queries">Interactive Queries</a></h3>
+    <p>
+        Interactive queries let you get more from streaming than just the processing of data. This feature allows you to treat the stream processing layer as a lightweight embedded database and, more concretely, <i>to directly query the latest state</i> of your stream processing application, without needing to materialize that state to external databases or external storage first.
+        As a result, interactive queries simplify the architecture of many use cases and lead to more application-centric architectures.  For example, you often no longer need to operate and interface with a separate database cluster -- or a separate infrastructure team in your company that runs that cluster -- to share data between a Kafka Streams application (say, an event-driven microservice) and downstream applications, regardless of whether these applications use Kafka Streams or not; they may even be applications that do not run on the JVM, e.g. implemented in Python, C/C++, or JavaScript.
+        The following diagrams juxtapose two architectures:  the first does not use interactive queries whereas the second architecture does.  It depends on the concrete use case to determine which of these architectures is a better fit -- the important takeaway is that Kafka Streams and interactive queries give you the flexibility to pick and to compose the right one, rather than limiting you to just a single way.
+    </p>
+
+
+    <figure>
+        <img class="centerd" src="/{{version}}/images/streams-interactive-queries-01.png" style="width:600pt;">
+        <figcaption style="text-align: center;"><i>Without interactive queries: increased complexity and heavier footprint of architecture</i></figcaption>
+    </figure>
+
+
+    <figure>
+        <img class="centered" src="/{{version}}/images/streams-interactive-queries-02.png" style="width:500pt;">
+        <figcaption style="text-align: center;"><i>With interactive queries: simplified, more application-centric architecture</i></figcaption>
+    </figure>
+
+    <p>
+        Here are some use case examples for applications that benefit from interactive queries:
+    </p>
+    <ul>
+        <li>Real-time monitoring:  A front-end dashboard that provides threat intelligence (e.g., web servers currently
+            under attack by cyber criminals) can directly query a Kafka Streams application that continuously generates the
+            relevant information by processing network telemetry data in real-time.
+        </li>
+        <li>Video gaming:  A Kafka Streams application continuously tracks location updates from players in the gaming universe.
+            A mobile companion app can then directly query the Kafka Streams application to show the current location of a player
+            to friends and family, and invite them to come along.  Similarly, the game vendor can use the data to identify unusual
+            hotspots of players, which may indicate a bug or an operational issue.
+        </li>
+        <li>Risk and fraud:  A Kafka Streams application continuously analyzes user transactions for anomalies and suspicious
+            behavior.  An online banking application can directly query the Kafka Streams application when a user logs in to deny
+            access to those users that have been flagged as suspicious.
+        </li>
+        <li>Trend detection:  A Kafka Streams application continuously computes the latest top charts across music genres based on
+            user listening behavior that is collected in real-time.  Mobile or desktop applications of a music store can then
+            interactively query for the latest charts while users are browsing the store.
+        </li>
+    </ul>
+
+    <h4><a id="treams_developer-guide_interactive-queries_your_app" href="#treams_developer-guide_interactive-queries_your_app">Your application and interactive queries</a></h4>
+    <p>
+        Interactive queries allow you to tap into the <i>state</i> of your application, and notably to do that from outside your application.
+        However, an application is not interactively queryable out of the box: you make it queryable by leveraging the API of Kafka Streams.
+    </p>
+
+    <p>
+        It is important to understand that the state of your application -- to be extra clear, we might call it "the full state of the entire application" -- is typically split across many distributed instances of your application, and thus across many state stores that are managed locally by these application instances.
+    </p>
+
+    <img class="centered" src="/{{version}}/images/streams-interactive-queries-03.png" style="width:400pt; height:400pt;">
+
+    <p>
+        Accordingly, the API to let you interactively query your application's state has two parts, a <i>local</i> and a <i>remote</i> one:
+    </p>
+
+    <ol>
+        <li><a href="#streams_developer-guide_interactive-queries_local-stores">Querying local state stores (for an application instance)</a>:  You can query that (part of the full) state that is managed locally by an instance of your application.  Here, an application instance can directly query its own local state stores.  You can thus use the corresponding (local) data in other parts of your application code that are not related to calling the Kafka Streams API.  Querying state stores is always *read-only* to guarantee that the underlying state stores   will never be mutated out-of-band, e.g. you cannot add new entries; state stores should only ever be mutated by the corresponding processor topology and the input data it operates on.
+        </li>
+        <li><a href="#streams_developer-guide_interactive-queries_discovery">Querying remote state stores (for the entire application)</a>:  To query the full state of your entire application we must be able to piece together the various local fragments of the state.  In addition to being able to (a) query local state stores as described in the previous bullet point, we also need to (b) discover all the running instances of your application in the network, including their respective state stores and (c) have a way to communicate with these instances over the network, i.e. an RPC layer.  Collectively, these building blocks enable intra-app communcation (between instances of the same app) as well as inter-app communication (from other applications) for interactive queries.
+        </li>
+    </ol>
+
+    <table class="data-table">
+        <tbody>
+        <tr>
+            <th>What of the below is required to access the state of ...</th>
+            <th>... an app instance (local state)</th>
+            <th>... the entire application (full state)</th>
+        </tr>
+        <tr>
+            <td>Query local state stores of an app instance</td><td>Required (but already built-in)</td><td>Required (but already built-in)</td>
+        </tr>
+        <tr>
+            <td>Make an app instance discoverable to others</td><td>Not needed</td><td>Required (but already built-in)</td>
+        </tr>
+        <tr>
+            <td>Discover all running app instances and their state stores</td><td>Not needed</td><td>Required (but already built-in)</td>
+        </tr>
+        <tr>
+            <td>Communicate with app instances over the network (RPC)</td><td>Not needed</td><td>Required <b>user must provide</b></td>
+        </tr>
+        </tbody>
+    </table>
+
+    <p>
+        Kafka Streams provides all the required functionality for interactively querying your application's state out of the box, with but one exception:  if you want to expose your application's full state via interactive queries, then --
+        for reasons we explain further down below -- it is your responsibility to add an appropriate RPC layer (such as a REST
+        API) to your application that allows application instances to communicate over the network.  If, however, you only need
+        to let your application instances access their own local state, then you do not need to add such an RPC layer at all.
+    </p>
+
+    <h4><a id="streams_developer-guide_interactive-queries_local-stores" href="#streams_developer-guide_interactive-queries_local-stores">Querying local state stores (for an application instance)</a></h4>
+    <p>
+        A Kafka Streams application is typically running on many instances.
+        The state that is locally available on any given instance is only a subset of the application's entire state.
+        Querying the local stores on an instance will, by definition, <i>only return data locally available on that particular instance</i>.
+        We explain how to access data in state stores that are not locally available in section <a href="#streams_developer-guide_interactive-queries_discovery">Querying remote state stores (for the entire application)</a>.
+    </p>
+
+    <p>
+        The method <code>KafkaStreams#store(...)</code> finds an application instance's local state stores <i>by name</i> and <i>by type</i>.
+    </p>
+
+    <figure>
+        <img class="centerd" src="/{{version}}/images/streams-interactive-queries-api-01.png" style="width:500pt;">
+        <figcaption style="text-align: center;"><i>Every application instance can directly query any of its local state stores</i></figcaption>
+    </figure>
+
+    <p>
+        The <i>name</i> of a state store is defined when you are creating the store, either when creating the store explicitly (e.g. when using the Processor API) or when creating the store implicitly (e.g. when using stateful operations in the DSL).
+        We show examples of how to name a state store further down below.
+    </p>
+
+    <p>
+        The <i>type</i> of a state store is defined by <code>QueryableStoreType</code>, and you can access the built-in types via the class <code>QueryableStoreTypes</code>.
+        Kafka Streams currently has two built-in types:
+    </p>
+    <ul>
+        <li>A key-value store <code>QueryableStoreTypes#keyValueStore()</code>, see <a href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying local key-value stores</a>.</li>
+        <li>A window store <code>QueryableStoreTypes#windowStore()</code>, see <a href="#streams_developer-guide_interactive-queries_local-window-stores">Querying local window stores</a>.</li>
+    </ul>
+
+    <p>
+        Both store types return <i>read-only</i> versions of the underlying state stores.
+        This read-only constraint is important to guarantee that the underlying state stores will never be mutated (e.g. new entries added) out-of-band, i.e. only the corresponding processing topology of Kafka Streams is allowed to mutate and update the state stores in order to ensure data consistency.
+    </p>
+    <p>
+        You can also implement your own <code>QueryableStoreType</code> as described in section <a href="#streams_developer-guide_interactive-queries_custom-stores#">Querying local custom stores</a>
+    </p>
+
+    <p>
+        Kafka Streams materializes one state store per stream partition, which means your application will potentially manage many underlying state stores.
+        The API to query local state stores enables you to query all of the underlying stores without having to know which partition the data is in.
+        The objects returned from <code>KafkaStreams#store(...)</code> are therefore wrapping potentially many underlying state stores.
+    </p>
+
+    <h4><a id="streams_developer-guide_interactive-queries_local-key-value-stores" href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying local key-value stores</a></h4>
+    <p>
+        To query a local key-value store, you must first create a topology with a key-value store:
+    </p>
+
+    <pre class="brush: java;">
+          StreamsConfig config = ...;
+          KStreamBuilder builder = ...;
+          KStream&lt;String, String&gt; textLines = ...;
+
+          // Define the processing topology (here: WordCount)
+          KGroupedStream&lt;String, String&gt; groupedByWord = textLines
+            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+            .groupBy((key, word) -> word, stringSerde, stringSerde);
+
+          // Create a key-value store named "CountsKeyValueStore" for the all-time word counts
+          groupedByWord.count("CountsKeyValueStore");
+
+          // Start an instance of the topology
+          KafkaStreams streams = new KafkaStreams(builder, config);
+          streams.start();
+        </pre>
+
+    <p>
+        Above we created a key-value store named "CountsKeyValueStore".
+        This store will hold the latest count for any word that is found on the topic "word-count-input".
+        Once the application has started we can get access to "CountsKeyValueStore" and then query it via the <code>ReadOnlyKeyValueStore</code> API:
+    </p>
+
+    <pre class="brush: java;">
+          // Get the key-value store CountsKeyValueStore
+          ReadOnlyKeyValueStore&lt;String, Long&gt; keyValueStore =
+              streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());
+
+          // Get value by key
+          System.out.println("count for hello:" + keyValueStore.get("hello"));
+
+          // Get the values for a range of keys available in this application instance
+          KeyValueIterator&lt;String, Long&gt; range = keyValueStore.range("all", "streams");
+          while (range.hasNext()) {
+            KeyValue&lt;String, Long&gt; next = range.next();
+            System.out.println("count for " + next.key + ": " + value);
+          }
+
+          // Get the values for all of the keys available in this application instance
+          KeyValueIterator&lt;String, Long&gt; range = keyValueStore.all();
+          while (range.hasNext()) {
+            KeyValue&lt;String, Long&gt; next = range.next();
+            System.out.println("count for " + next.key + ": " + value);
+          }
+        </pre>
+
+    <h4><a id="streams_developer-guide_interactive-queries_local-window-stores" href="#streams_developer-guide_interactive-queries_local-window-stores">Querying local window stores</a></h4>
+    <p>
+        A window store differs from a key-value store in that you will potentially have many results for any given key because the key can be present in multiple windows.
+        However, there will ever be at most one result per window for a given key.
+    </p>
+    <p>
+        To query a local window store, you must first create a topology with a window store:
+    </p>
+
+    <pre class="brush: java;">
+          StreamsConfig config = ...;
+          KStreamBuilder builder = ...;
+          KStream&lt;String, String&gt; textLines = ...;
+
+          // Define the processing topology (here: WordCount)
+          KGroupedStream&lt;String, String&gt; groupedByWord = textLines
+            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+            .groupBy((key, word) -> word, stringSerde, stringSerde);
+
+          // Create a window state store named "CountsWindowStore" that contains the word counts for every minute
+          groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
+        </pre>
+
+    <p>
+        Above we created a window store named "CountsWindowStore" that contains the counts for words in 1-minute windows.
+        Once the application has started we can get access to "CountsWindowStore" and then query it via the <code>ReadOnlyWindowStore</code> API:
+    </p>
+
+    <pre class="brush: java;">
+          // Get the window store named "CountsWindowStore"
+          ReadOnlyWindowStore&lt;String, Long&gt; windowStore =
+              streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
+
+          // Fetch values for the key "world" for all of the windows available in this application instance.
+          // To get *all* available windows we fetch windows from the beginning of time until now.
+          long timeFrom = 0; // beginning of time = oldest available
+          long timeTo = System.currentTimeMillis(); // now (in processing-time)
+          WindowStoreIterator&lt;Long&gt; iterator = windowStore.fetch("world", timeFrom, timeTo);
+          while (iterator.hasNext()) {
+            KeyValue&lt;Long, Long&gt; next = iterator.next();
+            long windowTimestamp = next.key;
+            System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
+          }
+        </pre>
+
+    <h4><a id="streams_developer-guide_interactive-queries_custom-stores" href="#streams_developer-guide_interactive-queries_custom-stores">Querying local custom state stores</a></h4>
+    <p>
+        Any custom state stores you use in your Kafka Streams applications can also be queried.
+        However there are some interfaces that will need to be implemented first:
+    </p>
+
+    <ol>
+        <li>Your custom state store must implement <code>StateStore</code>.</li>
+        <li>You should have an interface to represent the operations available on the store.</li>
+        <li>It is recommended that you also provide an interface that restricts access to read-only operations so users of this API can't mutate the state of your running Kafka Streams application out-of-band.</li>
+        <li>You also need to provide an implementation of <code>StateStoreSupplier</code> for creating instances of your store.</li>
+    </ol>
+
+    <p>
+        The class/interface hierarchy for your custom store might look something like:
+    </p>
+
+    <pre class="brush: java;">
+          public class MyCustomStore&lt;K,V&gt; implements StateStore, MyWriteableCustomStore&lt;K,V&gt; {
+            // implementation of the actual store
+          }
+
+          // Read-write interface for MyCustomStore
+          public interface MyWriteableCustomStore&lt;K,V&gt; extends MyReadableCustomStore&lt;K,V&gt; {
+            void write(K Key, V value);
+          }
+
+          // Read-only interface for MyCustomStore
+          public interface MyReadableCustomStore&lt;K,V&gt; {
+            V read(K key);
+          }
+
+          public class MyCustomStoreSupplier implements StateStoreSupplier {
+            // implementation of the supplier for MyCustomStore
+          }
+        </pre>
+
+    <p>
+        To make this store queryable you need to:
+    </p>
+    <ul>
+        <li>Provide an implementation of <code>QueryableStoreType</code>.</li>
+        <li>Provide a wrapper class that will have access to all of the underlying instances of the store and will be used for querying.</li>
+    </ul>
+
+    <p>
+        Implementing <code>QueryableStoreType</code> is straight forward:
+    </p>
+
+    <pre class="brush: java;">
+
+          public class MyCustomStoreType&lt;K,V&gt; implements QueryableStoreType&lt;MyReadableCustomStore&lt;K,V&gt;&gt; {
+
+            // Only accept StateStores that are of type MyCustomStore
+            public boolean accepts(final StateStore stateStore) {
+              return stateStore instanceOf MyCustomStore;
+            }
+
+            public MyReadableCustomStore&lt;K,V&gt; create(final StateStoreProvider storeProvider, final String storeName) {
+                return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
+            }
+
+          }
+        </pre>
+
+    <p>
+        A wrapper class is required because even a single instance of a Kafka Streams application may run multiple stream tasks and, by doing so, manage multiple local instances of a particular state store.
+        The wrapper class hides this complexity and lets you query a "logical" state store with a particular name without having to know about all of the underlying local instances of that state store.
+    </p>
+
+    <p>
+        When implementing your wrapper class you will need to make use of the <code>StateStoreProvider</code>
+        interface to get access to the underlying instances of your store.
+        <code>StateStoreProvider#stores(String storeName, QueryableStoreType&lt;T&gt; queryableStoreType)</code> returns a <code>List</code> of state stores with the given <code>storeName</code> and of the type as defined by <code>queryableStoreType</code>.
+    </p>
+    <p>
+        An example implemention of the wrapper follows (Java 8+):
+    </p>
+
+    <pre class="brush: java;">
+          // We strongly recommended implementing a read-only interface
+          // to restrict usage of the store to safe read operations!
+          public class MyCustomStoreTypeWrapper&lt;K,V&gt; implements MyReadableCustomStore&lt;K,V&gt; {
+
+            private final QueryableStoreType&lt;MyReadableCustomStore&lt;K, V&gt;&gt; customStoreType;
+            private final String storeName;
+            private final StateStoreProvider provider;
+
+            public CustomStoreTypeWrapper(final StateStoreProvider provider,
+                                          final String storeName,
+                                          final QueryableStoreType&lt;MyReadableCustomStore&lt;K, V&gt;&gt; customStoreType) {
+
+              // ... assign fields ...
+            }
+
+            // Implement a safe read method
+            @Override
+            public V read(final K key) {
+              // Get all the stores with storeName and of customStoreType
+              final List&lt;MyReadableCustomStore&lt;K, V&gt;&gt; stores = provider.getStores(storeName, customStoreType);
+              // Try and find the value for the given key
+              final Optional&lt;V&gt; value = stores.stream().filter(store -> store.read(key) != null).findFirst();
+              // Return the value if it exists
+              return value.orElse(null);
+            }
+          }
+        </pre>
+
+    <p>
+        Putting it all together you can now find and query your custom store:
+    </p>
+
+    <pre class="brush: java;">
+          StreamsConfig config = ...;
+          TopologyBuilder builder = ...;
+          ProcessorSupplier processorSuppler = ...;
+
+          // Create CustomStoreSupplier for store name the-custom-store
+          MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
+          // Add the source topic
+          builder.addSource("input", "inputTopic");
+          // Add a custom processor that reads from the source topic
+          builder.addProcessor("the-processor", processorSupplier, "input");
+          // Connect your custom state store to the custom processor above
+          builder.addStateStore(customStoreSupplier, "the-processor");
+
+          KafkaStreams streams = new KafkaStreams(builder, config);
+          streams.start();
+
+          // Get access to the custom store
+          MyReadableCustomStore&lt;String,String&gt; store = streams.store("the-custom-store", new MyCustomStoreType&lt;String,String&gt;());
+          // Query the store
+          String value = store.read("key");
+        </pre>
+
+    <h4><a id="streams_developer-guide_interactive-queries_discovery" href="#streams_developer-guide_interactive-queries_discovery">Querying remote state stores (for the entire application)</a></h4>
+
+    <p>
+        Typically, the ultimate goal for interactive queries is not to just query locally available state stores from within an instance of a Kafka Streams application as described in the previous section.
+        Rather, you want to expose the application's full state (i.e. the state across all its instances) to other applications that might be running on different machines.
+        For example, you might have a Kafka Streams application that processes the user events in a multi-player video game, and you want to retrieve the latest status of each user directly from this application so that you can display it in a mobile companion app.
+    </p>
+    <p>
+        Three steps are needed to make the full state of your application queryable:
+    </p>
+
+    <ol>
+        <li>You must <a href="#streams_developer-guide_interactive-queries_rpc-layer">add an RPC layer to your application</a> so that the instances of your application may be interacted with via the network -- notably to respond to interactive queries.
+            By design Kafka Streams does not provide any such RPC functionality out of the box so that you can freely pick your favorite approach: a REST API, Thrift, a custom protocol, and so on.</li>
+        <li>You need to <a href="#streams_developer-guide_interactive-queries_expose-rpc">expose the respective RPC endpoints</a> of your application's instances via the <code>application.server</code> configuration setting of Kafka Streams.
+            Because RPC endpoints must be unique within a network, each instance will have its own value for this configuration setting.
+            This makes an application instance discoverable by other instances.</li>
+        <li> In the RPC layer, you can then <a href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">discover remote application instances</a> and their respective state stores (e.g. for forwarding queries to other app instances if an instance lacks the local data to respond to a query) as well as <a href="#streams_developer-guide_interactive-queries_local-stores">query locally available state stores</a> (in order to directly respond to queries) in order to make    the full state of your application queryable.</li>
+    </ol>
+
+    <figure>
+        <img class="centered" src="/{{version}}/images/streams-interactive-queries-api-02.png" style="width:500pt;">
+        <figcaption style="text-align: center;"><i>Discover any running instances of the same application as well as the respective RPC endpoints they expose for interactive queries</i></figcaption>
+    </figure>
+
+    <h4><a id="streams_developer-guide_interactive-queries_rpc-layer" href="#streams_developer-guide_interactive-queries_rpc-layer">Adding an RPC layer to your application</a></h4>
+    <p>
+        As Kafka Streams doesn't provide an RPC layer you are free to choose your favorite approach.
+        There are many ways of doing this, and it will depend on the technologies you have chosen to use.
+        The only requirements are that the RPC layer is embedded within the Kafka Streams application and that it exposes an endpoint that other application instances and applications can connect to.
+    </p>
+
+    <h4><a id="streams_developer-guide_interactive-queries_expose-rpc" href="#streams_developer-guide_interactive-queries_expose-rpc">Exposing the RPC endpoints of your application</a></h4>
+    <p>
+        To enable the remote discovery of state stores running within a (typically distributed) Kafka Streams application you need to set the <code>application.server</code> configuration property in <code>StreamsConfig</code>.
+        The <code>application.server</code> property defines a unique <code>host:port</code> pair that points to the RPC endpoint of the respective instance of a Kafka Streams application.
+        It's important to understand that the value of this configuration property varies across the instances of your application.
+        When this property is set, then, for every instance of an application, Kafka Streams will keep track of the instance's RPC endpoint information, its state stores, and assigned stream partitions through instances of <code>StreamsMetadata</code>
+    </p>
+    <p>
+        Below is an example of configuring and running a Kafka Streams application that supports the discovery of its state stores.
+    </p>
+
+    <pre class="brush: java;">
+
+          Properties props = new Properties();
+          // Set the unique RPC endpoint of this application instance through which it
+          // can be interactively queried.  In a real application, the value would most
+          // probably not be hardcoded but derived dynamically.
+          String rpcEndpoint = "host1:4460";
+          props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
+          // ... further settings may follow here ...
+
+          StreamsConfig config = new StreamsConfig(props);
+          KStreamBuilder builder = new KStreamBuilder();
+
+          KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde, "word-count-input");
+
+          KGroupedStream&lt;String, String&gt; groupedByWord = textLines
+              .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+              .groupBy((key, word) -> word, stringSerde, stringSerde);
+
+          // This call to `count()` creates a state store named "word-count".
+          // The state store is discoverable and can be queried interactively.
+          groupedByWord.count("word-count");
+
+          // Start an instance of the topology
+          KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+          streams.start();
+
+          // Then, create and start the actual RPC service for remote access to this
+          // application instance's local state stores.
+          //
+          // This service should be started on the same host and port as defined above by
+          // the property `StreamsConfig.APPLICATION_SERVER_CONFIG`.  The example below is
+          // fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)
+          // that showcase how to implement such a service to get you started.
+          MyRPCService rpcService = ...;
+          rpcService.listenAt(rpcEndpoint);
+        </pre>
+
+    <h4><a id="streams_developer-guide_interactive-queries_discover-app-instances-and-stores" href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">Discovering and accessing application instances and their respective local state stores</a></h4>
+    <p>
+        With the <code>application.server</code> property set, we can now find the locations of remote app instances and their state stores.
+        The following methods return <code>StreamsMetadata</code> objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores.
+    </p>
+    <ul>
+        <li><code>KafkaStreams#allMetadata()</code>: find all instances of this application</li>
+        <li><code>KafkaStreams#allMetadataForStore(String storeName)</code>: find those applications instances that manage local instances of the state store "storeName"</li>
+        <li><code>KafkaStreams#metadataForKey(String storeName, K key, Serializer&lt;K&gt; keySerializer)</code>: using the default stream partitioning strategy, find the one application instance that holds the data for the given key in the given state store</li>
+        <li><code>KafkaStreams#metadataForKey(String storeName, K key, StreamPartitioner&lt;K, ?&gt; partitioner)</code>: using <code>>partitioner</code>, find the one application instance that holds the data for the given key in the given state store</li>
+    </ul>
+
+    <p>
+        If <code>application.server</code> is not configured for an application instance, then the above methods will not find any <code>StreamsMetadata</code> for it.
+    </p>
+
+    <p>
+        For example, we can now find the <code>StreamsMetadata</code> for the state store named "word-count" that we defined in the code example shown in the previous section:
+    </p>
+
+    <pre class="brush: java;">
+
+          KafkaStreams streams = ...;
+          // Find all the locations of local instances of the state store named "word-count"
+          Collection&lt;StreamsMetadata&gt; wordCountHosts = streams.allMetadataForStore("word-count");
+
+          // For illustrative purposes, we assume using an HTTP client to talk to remote app instances.
+          HttpClient http = ...;
+
+          // Get the word count for word (aka key) 'alice': Approach 1
+          //
+          // We first find the one app instance that manages the count for 'alice' in its local state stores.
+          StreamsMetadata metadata = streams.metadataForKey("word-count", "alice", Serdes.String().serializer());
+          // Then, we query only that single app instance for the latest count of 'alice'.
+          // Note: The RPC URL shown below is fictitious and only serves to illustrate the idea.  Ultimately,
+          // the URL (or, in general, the method of communication) will depend on the RPC layer you opted to
+          // implement.  Again, we provide end-to-end demo applications (such as KafkaMusicExample) that showcase
+          // how to implement such an RPC layer.
+          Long result = http.getLong("http://" + metadata.host() + ":" + metadata.port() + "/word-count/alice");
+
+          // Get the word count for word (aka key) 'alice': Approach 2
+          //
+          // Alternatively, we could also choose (say) a brute-force approach where we query every app instance
+          // until we find the one that happens to know about 'alice'.
+          Optional&lt;Long&gt; result = streams.allMetadataForStore("word-count")
+              .stream()
+              .map(streamsMetadata -> {
+                  // Construct the (fictituous) full endpoint URL to query the current remote application instance
+                  String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
+                  // Read and return the count for 'alice', if any.
+                  return http.getLong(url);
+              })
+              .filter(s -> s != null)
+              .findFirst();
+        </pre>
+
+    <p>
+        At this point the full state of the application is interactively queryable:
+    </p>
+    <ul>
+        <li>We can discover the running instances of the application as well as the state stores they manage locally.</li>
+        <li>Through the RPC layer that was added to the application, we can communicate with these application instances over the network and query them for locally available state</li>
+        <li>The application instances are able to serve such queries because they can directly query their own local state stores and respond via the RPC layer</li>
+        <li>Collectively, this allows us to query the full state of the entire application</li>
+    </ul>
+
     <h3><a id="streams_configure_execute" href="#streams_configure_execute">Application Configuration and Execution</a></h3>
 
     <p>
-    Besides defining the topology, developers will also need to configure their applications
-    in <code>StreamsConfig</code> before running it. A complete list of
-    Kafka Streams configs can be found <a href="/{{version}}/documentation/#streamsconfigs"><b>here</b></a>.
-    Note, that different parameters do have different "levels of importance", with the following interpretation:
+        Besides defining the topology, developers will also need to configure their applications
+        in <code>StreamsConfig</code> before running it. A complete list of
+        Kafka Streams configs can be found <a href="/{{version}}/documentation/#streamsconfigs"><b>here</b></a>.
+        Note, that different parameters do have different "levels of importance", with the following interpretation:
     </p>
     <ul>
         <li> HIGH: you would most likely change the default value if you go to production </li>
@@ -421,8 +939,8 @@
     </ul>
 
     <p>
-    Specifying the configuration in Kafka Streams is similar to the Kafka Producer and Consumer clients. Typically, you create a <code>java.util.Properties</code> instance,
-    set the necessary parameters, and construct a <code>StreamsConfig</code> instance from the <code>Properties</code> instance.
+        Specifying the configuration in Kafka Streams is similar to the Kafka Producer and Consumer clients. Typically, you create a <code>java.util.Properties</code> instance,
+        set the necessary parameters, and construct a <code>StreamsConfig</code> instance from the <code>Properties</code> instance.
     </p>
 
     <pre class="brush: java;">
@@ -447,11 +965,11 @@
 
     <h4><a id="streams_client_config" href="#streams_clients_config">Producer and Consumer Configuration</a></h4>
     <p>
-    Apart from Kafka Streams' own configuration parameters you can also specify parameters for the Kafka consumers and producers that are used internally,
-    depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via <code>StreamsConfig</code>.
-    Note that some consumer and producer configuration parameters do use the same parameter name. For example, <code>send.buffer.bytes</code> or <code>receive.buffer.bytes</code> which
-    are used to configure TCP buffers; <code>request.timeout.ms</code> and <code>retry.backoff.ms</code> which control retries for client request (and some more).
-    If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>:
+        Apart from Kafka Streams' own configuration parameters you can also specify parameters for the Kafka consumers and producers that are used internally,
+        depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via <code>StreamsConfig</code>.
+        Note that some consumer and producer configuration parameters do use the same parameter name. For example, <code>send.buffer.bytes</code> or <code>receive.buffer.bytes</code> which
+        are used to configure TCP buffers; <code>request.timeout.ms</code> and <code>retry.backoff.ms</code> which control retries for client request (and some more).
+        If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>:
     </p>
 
     <pre class="brush: java;">
@@ -492,14 +1010,14 @@
 
     <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>
     <p>
-    You can call Kafka Streams from anywhere in your application code.
-    Very commonly though you would do so within the <code>main()</code> method of your application, or some variant thereof.
+        You can call Kafka Streams from anywhere in your application code.
+        Very commonly though you would do so within the <code>main()</code> method of your application, or some variant thereof.
     </p>
 
     <p>
-    First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
-    builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
-    that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
+        First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
+        builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
+        that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
     </p>
 
     <pre class="brush: java;">
@@ -527,7 +1045,7 @@
     </pre>
 
     <p>
-    At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code> method:
+        At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code> method:
     </p>
 
     <pre class="brush: java;">
@@ -536,7 +1054,7 @@
     </pre>
 
     <p>
-    To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
+        To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
     </p>
 
     <pre class="brush: java;">
@@ -548,7 +1066,7 @@
     </pre>
 
     <p>
-    To stop the application instance call the <code>close()</code> method:
+        To stop the application instance call the <code>close()</code> method:
     </p>
 
     <pre class="brush: java;">
@@ -566,13 +1084,13 @@
     </pre>
 
     <p>
-    When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks that can be executed in parallel by the stream threads within the instance.
-    If the processor topology defines any state stores, these state stores will also be (re-)constructed, if possible, during the initialization
-    period of their associated stream tasks.
-    It is important to understand that, when starting your application as described above, you are actually launching what Kafka Streams considers to be one instance of your application.
-    More than one instance of your application may be running at a time, and in fact the common scenario is that there are indeed multiple instances of your application running in parallel (e.g., on another JVM or another machine).
-    In such cases, Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
-    See <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks"><b>Stream Partitions and Tasks</b></a> and <a href="/{{version}}/documentation/streams/architecture#streams_architecture_threads"><b>Threading Model</b></a> for details.
+        When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks that can be executed in parallel by the stream threads within the instance.
+        If the processor topology defines any state stores, these state stores will also be (re-)constructed, if possible, during the initialization
+        period of their associated stream tasks.
+        It is important to understand that, when starting your application as described above, you are actually launching what Kafka Streams considers to be one instance of your application.
+        More than one instance of your application may be running at a time, and in fact the common scenario is that there are indeed multiple instances of your application running in parallel (e.g., on another JVM or another machine).
+        In such cases, Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.
+        See <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks"><b>Stream Partitions and Tasks</b></a> and <a href="/{{version}}/documentation/streams/architecture#streams_architecture_threads"><b>Threading Model</b></a> for details.
     </p>
 
     <div class="pagination">
@@ -584,9 +1102,9 @@
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
 <div class="content documentation documentation--current">
-	<!--#include virtual="../../includes/_nav.htm" -->
-	<div class="right">
-		<!--#include virtual="../../includes/_docs_banner.htm" -->
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Streams</a></li>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/422095a8/0110/streams/index.html
----------------------------------------------------------------------
diff --git a/0110/streams/index.html b/0110/streams/index.html
index 89276f8..013494b 100644
--- a/0110/streams/index.html
+++ b/0110/streams/index.html
@@ -15,7 +15,7 @@
  limitations under the License.
 -->
 
-<script><!--#include virtual="../js/templateData.js" --></script>
+<script><!--#include virtual="js/templateData.js" --></script>
 
 <script id="streams-template" type="text/x-handlebars-template">
     <h1>Streams</h1>
@@ -32,6 +32,7 @@
             <ul>
                 <li><a href="/{{version}}/documentation/streams/developer-guide#streams_processor">Low-level Processor API</a></li>
                 <li><a href="/{{version}}/documentation/streams/developer-guide#streams_dsl">High-level Streams DSL</a></li>
+                <li><a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_querie">Interactive Queries</a></li>
                 <li><a href="/{{version}}/documentation/streams/developer-guide#streams_execute">Application Configuration and Execution</a></li>
             </ul>
         </li>
@@ -70,19 +71,19 @@
     </div>
 </script>
 
-<!--#include virtual="../../includes/_header.htm" -->
-<!--#include virtual="../../includes/_top.htm" -->
+<!--#include virtual="../includes/_header.htm" -->
+<!--#include virtual="../includes/_top.htm" -->
 <div class="content documentation documentation--current">
-	<!--#include virtual="../../includes/_nav.htm" -->
-	<div class="right">
-		<!--#include virtual="../../includes/_docs_banner.htm" -->
+    <!--#include virtual="../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
         </ul>
         <div class="p-streams"></div>
     </div>
 </div>
-<!--#include virtual="../../includes/_footer.htm" -->
+<!--#include virtual="../includes/_footer.htm" -->
 <script>
 $(function() {
   // Show selected style on nav item

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/422095a8/0110/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/0110/streams/upgrade-guide.html b/0110/streams/upgrade-guide.html
index 57eb727..8ec3e22 100644
--- a/0110/streams/upgrade-guide.html
+++ b/0110/streams/upgrade-guide.html
@@ -21,21 +21,21 @@
     <h1>Upgrade Guide &amp; API Changes</h1>
 
     <p>
-    If you want to upgrade from 0.10.2.x to 0.11.0 you don't need to do any code changes as the public API is fully backward compatible.
-    However, some configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades.
-    See <a href="#streams_api_changes_0110">below</a> a complete list of 0.11.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        If you want to upgrade from 0.10.2.x to 0.11.0 you don't need to do any code changes as the public API is fully backward compatible.
+        However, some configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades.
+        See <a href="#streams_api_changes_0110">below</a> a complete list of 0.11.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
     </p>
 
     <p>
-    If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams">Upgrade Section for 0.10.2</a>.
-    It highlights incompatible changes you need to consider to upgrade your code and application.
-    See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams">Upgrade Section for 0.10.2</a>.
+        It highlights incompatible changes you need to consider to upgrade your code and application.
+        See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
     </p>
 
     <p>
-    If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams">Upgrade Section for 0.10.1</a>.
-    It highlights incompatible changes you need to consider to upgrade your code and application.
-    See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams">Upgrade Section for 0.10.1</a>.
+        It highlights incompatible changes you need to consider to upgrade your code and application.
+        See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
     </p>
 
     <h3><a id="streams_api_changes_0110" href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
@@ -100,12 +100,12 @@
         <li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li>
         <li> exactly-once: <code>[client.Id]-StreamThread-[sequence-number]-[taskId]</code> </li>
     </ul>
-    <p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
+    <p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id<code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
 
     <h3><a id="streams_api_changes_01021" href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
 
     <p>
-    Parameter updates in <code>StreamsConfig</code>:
+        Parameter updates in <code>StreamsConfig</code>:
     </p>
     <ul>
         <li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
@@ -137,7 +137,7 @@
     <ul>
         <li> removed methods: <code>#addLatencySensor()</code> </li>
         <li> added methods: <code>#addLatencyAndThroughputSensor()</code>, <code>#addThroughputSensor()</code>, <code>#recordThroughput()</code>,
-        <code>#addSensor()</code>, <code>#removeSensor()</code> </li>
+            <code>#addSensor()</code>, <code>#removeSensor()</code> </li>
     </ul>
 
     <p> New methods in <code>TopologyBuilder</code>: </p>
@@ -157,7 +157,7 @@
         <li> added overloads for <code>#join()</code> to join with <code>KTable</code> </li>
         <li> added overloads for <code>#join()</code> and <code>leftJoin()</code> to join with <code>GlobalKTable</code> </li>
         <li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x
-                (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
+            (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
     </ul>
 
     <p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
@@ -169,14 +169,14 @@
     <ul>
         <li> added class <code>SessionWindows</code> to specify session windows </li>
         <li> added overloads for <code>KGroupedStream</code> methods <code>#count()</code>, <code>#reduce()</code>, and <code>#aggregate()</code>
-                to allow session window aggregations </li>
+            to allow session window aggregations </li>
     </ul>
 
     <p> Changes to <code>TimestampExtractor</code>: </p>
     <ul>
         <li> method <code>#extract()</code> has a second parameter now </li>
         <li> new default timestamp extractor class <code>FailOnInvalidTimestamp</code>
-                (it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li>
+            (it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li>
         <li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code> </li>
     </ul>
 
@@ -226,9 +226,9 @@
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
 <div class="content documentation documentation--current">
-	<!--#include virtual="../../includes/_nav.htm" -->
-	<div class="right">
-		<!--#include virtual="../../includes/_docs_banner.htm" -->
+    <!--#include virtual="../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../includes/_docs_banner.htm" -->
         <ul class="breadcrumbs">
             <li><a href="/documentation">Documentation</a></li>
             <li><a href="/documentation/streams">Streams</a></li>


Mime
View raw message