kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: First cut at porting State Store docs to AK
Date Fri, 11 Aug 2017 08:35:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8ffb1a1fe -> b2b529522


MINOR: First cut at porting State Store docs to AK

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #3629 from bbejeck/docs-updates-for-kip-167


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2b52952
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2b52952
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2b52952

Branch: refs/heads/trunk
Commit: b2b529522d0683b4c3f97be8ac92597048f45037
Parents: 8ffb1a1
Author: Bill Bejeck <bill@confluent.io>
Authored: Fri Aug 11 09:35:26 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Aug 11 09:35:26 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html | 234 ++++++++++++++++++++++++++++-----
 1 file changed, 200 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b2b52952/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 97de11c..bcbce24 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -161,48 +161,214 @@
         <li>Finally three sink nodes are added to complete the topology using the <code>addSink</code>
method, each piping from a different parent processor node and writing to a separate topic.</li>
     </ul>
 
-    <h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State
Stores</a></h4>
+<h4><a id="streams_processor_statestore" href="#streams_processor_statestore">State
Stores</a></h4>
+
+<p>
+In order to make state stores fault-tolerant (e.g., to recover from machine crashes) as well
as to allow for state store migration without data loss (e.g., to migrate a stateful stream
task from one machine to another when elastically adding or removing capacity from your application),
a state store can be <strong>continuously backed up</strong> to a Kafka topic
behind the scenes. 
+We sometimes refer to this topic as the state store's associated <em>changelog topic</em>
or simply its <em>changelog</em>. 
+In the case of a machine failure, for example, the state store and thus the application's
state can be fully restored from its changelog. 
+You can enable or disable this backup feature for a state store, and thus its fault tolerance.
+</p>
+
+<p>
+By default, persistent <strong>key-value stores</strong> are fault-tolerant.

+They are backed by a <a href="https://kafka.apache.org/documentation.html#compaction">compacted</a>
changelog topic. 
+The purpose of compacting this topic is to prevent the topic from growing indefinitely, to
reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time
if a state store needs to be restored from its changelog topic.
+</p>
+
+<p>
+Similarly, persistent <strong>window stores</strong> are fault-tolerant. 
+They are backed by a topic that uses both <em>compaction</em> and <em>deletion</em>.

+Using deletion in addition to compaction is required for the changelog topics of window stores
because of the structure of the message keys that are being sent to the changelog topics:
for window stores, the message keys are composite keys that include not only the &quot;normal&quot;
key but also window timestamps. 
+For such composite keys it would not be sufficient to enable just compaction in order to
prevent a changelog topic from growing out of bounds. 
+With deletion enabled, old windows that have expired will be cleaned up by Kafka's log cleaner
as the log segments expire. 
+The default retention setting is <code>Windows#maintainMs()</code> + 1 day. This
setting can be overriden by specifying <code>StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</code>
in the <code>StreamsConfig</code>.
+</p>
+
+<h4><a id="monitoring-the-restoration-progress-of-fault-tolerant-state-stores" href="#restoration_progress">Monitoring
the Restoration Progress of Fault-tolerant State Stores</a></h4>
+
+<p>
+When starting up your application any fault-tolerant state stores don't need a restoration
process as the persisted state is read from local disk. 
+But there could be situations when a full restore from the backing changelog topic is required
(e.g., a failure wiped out the local state or your application runs in a stateless environment
and persisted data is lost on re-starts).
+</p>
+
+<p>
+If you have a significant amount of data in the changelog topic, the restoration process
could take a non-negligible amount of time. 
+Given that processing of new data won't start until the restoration process is completed,
having a window into the progress of restoration is useful.
+</p>
+
+<p>
+In order to observe the restoration of all state stores you provide your application an instance
of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface.

+You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
by calling the <code>KafkaStreams#setGlobalStateRestoreListener</code> method.
+</p>
+
+<p>
+ A basic implementation example that prints restoration status to the console:
+</p>
+
+<pre class="brush: java;">
+  import org.apache.kafka.common.TopicPartition;
+  import org.apache.kafka.streams.processor.StateRestoreListener;
+
+   public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
+
+      @Override
+      public void onRestoreStart(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long startingOffset,
+                                 final long endingOffset) {
+
+          System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
+          System.out.println(" total records to be restored " + (endingOffset - startingOffset));
+      }
+
+      @Override
+      public void onBatchRestored(final TopicPartition topicPartition,
+                                  final String storeName,
+                                  final long batchEndOffset,
+                                  final long numRestored) {
+
+          System.out.println("Restored batch " + numRestored + " for " + storeName + " partition
" + topicPartition.partition());
+
+      }
+
+      @Override
+      public void onRestoreEnd(final TopicPartition topicPartition,
+                               final String storeName,
+                               final long totalRestored) {
+
+          System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
+      }
+  }
+</pre>
+
+<blockquote>
+<p>
+  The <code>StateRestoreListener</code> instance is shared across all <code>org.apache.kafka.streams.processor.internals.StreamThread</code>
instances and it is assumed all methods are stateless. 
+  If any stateful operations are desired, then the user will need to provide synchronization
internally.
+</p>
+</blockquote>
+
+<h4> <a id="enable-disable-fault-tolerance-of-state-stores-store-changelogs" href="#disable-chagelogs">Enable
/ Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
+
+<p>
+  You can enable or disable fault tolerance for a state store by enabling or disabling, respectively,
the changelogging of the store through <code>enableLogging()</code> and <code>disableLogging()</code>.

+  You can also fine-tune the associated topic’s configuration if needed.
+</p>
+
+<p>Example for disabling fault-tolerance:</p>
+
+<pre class="brush: java;">
+
+  import org.apache.kafka.streams.processor.StateStoreSupplier;
+  import org.apache.kafka.streams.state.Stores;
+
+  StateStoreSupplier countStoreSupplier = Stores.create("Counts")
+              .withKeys(Serdes.String())
+              .withValues(Serdes.Long())
+              .persistent()
+              .disableLogging() // disable backing up the store to a changelog topic
+              .build();
+
+</pre>
+
+<blockquote>
+<p>If the changelog is disabled then the attached state store is no longer fault tolerant
and it can't have any standby replicas</p>
+</blockquote>
+
+<p>
+   Example for enabling fault tolerance, with additional changelog-topic configuration: You
can add any log config 
+   from kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61. Unrecognized
configs will be ignored.
+</p>
+
+<pre class="brush: java;">
 
-    <p>
-        Note that the <code>Processor</code> API is not limited to only accessing
the current records as they arrive in the <code>process()</code> method, but can
also maintain processing states
-        that keep recently arrived records to use in stateful processing operations such
as windowed joins or aggregation.
-        To take advantage of these states, users can define a state store by implementing
the <code>StateStore</code> interface (the Kafka Streams library also has a few
extended interfaces such as <code>KeyValueStore</code>);
-        in practice, though, users usually do not need to customize such a state store from
scratch but can simply use the <code>Stores</code> factory to define a state store
by specifying whether it should be persistent, log-backed, etc.
-        In the following example, a persistent key-value store named "Counts" with key type
<code>String</code> and value type <code>Long</code> is created.
-    </p>
+  import org.apache.kafka.streams.processor.StateStoreSupplier;
+  import org.apache.kafka.streams.state.Stores;
+
+  Map&lt;String, String&gt; changelogConfig = new HashMap();
+  // override min.insync.replicas
+  changelogConfig.put("min.insyc.replicas", "1")
+
+  StateStoreSupplier countStoreSupplier = Stores.create("Counts")
+              .withKeys(Serdes.String())
+              .withValues(Serdes.Long())
+              .persistent()
+              .enableLogging(changelogConfig) // enable changelogging, with custom changelog
settings
+              .build();
 
-    <pre class="brush: java;">
-    StateStoreSupplier countStore = Stores.create("Counts")
-    .withKeys(Serdes.String())
-    .withValues(Serdes.Long())
-    .persistent()
-    .build();
-    </pre>
+</pre>
 
-    <p>
-        To take advantage of these state stores, developers can use the <code>Topology.addStateStore</code>
method when building the
-        processor topology to create the local state and associate it with the processor
nodes that needs to access it; or they can connect a created
-        state store with the existing processor nodes through <code>Topology.connectProcessorAndStateStores</code>.
-    </p>
+<h4><a id="implementing-custom-state-stores" href="#implement-custom-store">Implementing
custom State Stores</a></h4>
 
-    <pre class="brush: java;">
-    Topology topology = new Topology();
+<p>
+ Apart from using the built-in state store types, you can also implement your own. 
+ The primary interface to implement for the store is <code>org.apache.kafka.streams.processor.StateStore</code>.

+ Beyond that, Kafka Streams also has a few extended interfaces such as <code>KeyValueStore</code>.
+</p>
 
-    topology.addSource("SOURCE", "src-topic")
+<p>
+  In addition to the actual store, you also need to provide a &quot;factory&quot;
for the store by implementing the <code>org.apache.kafka.streams.processor.StateStoreSupplier</code>
interface, which Kafka Streams uses to create instances of your store.
+</p>
 
-    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
-    // add the created state store "COUNTS" associated with processor "PROCESS1"
-    .addStateStore(countStore, "PROCESS1")
-    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
-    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
+<p>
+  You also have the option of providing a <code>org.apache.kafka.streams.processor.StateRestoreCallback</code>
instance used to restore the state store from its backing changelog topic. 
+  This is done via the <code>org.apache.kafka.streams.processor.ProcessorContext#register</code>
call inside the <code>StateStore#init</code> all.
+</p>
 
-    // connect the state store "COUNTS" with processor "PROCESS2"
-    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+<pre class="brush: java;">
+  public void init(ProcessorContext context, StateStore store) {
+     context.register(store, false, stateRestoreCallBackIntance);
+   }    
+</pre>
+
+<p>
+  There is an additional interface <code>org.apache.kafka.streams.processor.BatchingStateRestoreCallback</code>
that provides bulk restoration semantics vs. the single record-at-a-time restoration semantics
offered by the <code>StateRestoreCallback</code> interface.
+</p>
+
+<p>
+  Addtionally there are two abstract classes that implement <code>StateRestoreCallback</code>
or <code>BatchingStateRestoreCallback</code> in conjuntion with the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
interface (<code>org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback</code>
and <code>org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback</code>
respectively) that provide the ability for the state store to recieve notification of restoration
progress for that store. 
+  The <code>StateRestoreListener</code> in this case is per state store instance
and is used for internal purposes such as updating config settings based on the status of
the restoration process.
+</p>
+
+<h4><a id="connecting-processors-and-state-stores" href="#connecting-processors-state-stores">Connecting
Processors and State Stores</a></h4>
+
+<p>
+Now that we have defined a processor (WordCountProcessor) and the state stores, we can now
construct the processor topology by connecting these processors and state stores together
by using the <code>Topology</code> instance. 
+In addition, users can add <em>source processors</em> with the specified Kafka
topics to generate input data streams into the topology, and <em>sink processors</em>
with the specified Kafka topics to generate output data streams out of the topology.
+</p>
+
+<pre class="brush: java;">
+       Topology topology = new Topology();
+
+      // add the source processor node that takes Kafka topic "source-topic" as input
+      topology.addSource("Source", "source-topic")
+
+      // add the WordCountProcessor node which takes the source processor as its upstream
processor
+      .addProcessor("Process", () -> new WordCountProcessor(), "Source")
+
+      // add the count store associated with the WordCountProcessor processor
+      .addStateStore(countStoreSupplier, "Process")
+
+      // add the sink processor node that takes Kafka topic "sink-topic" as output
+      // and the WordCountProcessor node as its upstream processor
+      .addSink("Sink", "sink-topic", "Process");
+</pre>
+
+<p>There are several steps in the above implementation to build the topology, and here
is a quick walk-through:</p>
+<ul>
+   <li>A source processor node named &quot;Source&quot; is added to the topology
using the <code>addSource</code> method, with one Kafka topic &quot;source-topic&quot;
fed to it.</li>
+   <li>A processor node named &quot;Process&quot; with the pre-defined <code>WordCountProcessor</code>
logic is then added as the downstream processor of the &quot;Source&quot; node using
the <code>addProcessor</code> method.</li>
+   <li>A predefined persistent key-value state store is created and associated with
the &quot;Process&quot; node, using <code>countStoreSupplier</code>.</li>
+   <li>A sink processor node is then added to complete the topology using the <code>addSink</code>
method, taking the &quot;Process&quot; node as its upstream processor and writing
to a separate &quot;sink-topic&quot; Kafka topic.</li>
+</ul>
+
+<p>
+In this topology, the &quot;Process&quot; stream processor node is considered a downstream
processor of the &quot;Source&quot; node, and an upstream processor of the &quot;Sink&quot;
node. 
+As a result, whenever the &quot;Source&quot; node forward a newly fetched record
from Kafka to its downstream &quot;Process&quot; node, <code>WordCountProcessor#process()</code>
method is triggered to process the record and update the associated state store; and whenever
<code>context#forward()</code> is called in the <code>WordCountProcessor#punctuate()</code>
method, the aggregate key-value pair will be sent via the &quot;Sink&quot; processor
node to the Kafka topic &quot;sink-topic&quot;. 
+Note that in the <code>WordCountProcessor</code> implementation, users need to
refer to the same store name &quot;Counts&quot; when accessing the key-value store;
otherwise an exception will be thrown at runtime, indicating that the state store cannot be
found; also, if the state store itself is not associated with the processor in the <code>Topology</code>
code, accessing it in the processor's <code>init()</code> method will also throw
an exception at runtime, indicating the state store is not accessible from this processor.
+</p>
 
-    .addSink("SINK1", "sink-topic1", "PROCESS1")
-    .addSink("SINK2", "sink-topic2", "PROCESS2")
-    .addSink("SINK3", "sink-topic3", "PROCESS3");
-    </pre>
 
     <h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe
a <code>Topology</code></a></h4>
 


Mime
View raw message