kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: streams memory management docs
Date Wed, 09 Aug 2017 10:56:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 3bc5ee7dd -> d05112125

MINOR: streams memory management docs

update streams memory management docs

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Eno Thereska <eno.thereska@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3633 from dguy/mem-doc-011

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0511212
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0511212
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0511212

Branch: refs/heads/0.11.0
Commit: d05112125614ded3febe7fe7ffcdfefd0dd2eb6f
Parents: 3bc5ee7
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Aug 9 11:55:57 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Aug 9 11:55:57 2017 +0100

 .../streams-cache-and-commit-interval.png       | Bin 0 -> 38648 bytes
 docs/streams/developer-guide.html               | 175 +++++++++++++++++++
 2 files changed, 175 insertions(+)

diff --git a/docs/images/streams-cache-and-commit-interval.png b/docs/images/streams-cache-and-commit-interval.png
new file mode 100644
index 0000000..a663bc6
Binary files /dev/null and b/docs/images/streams-cache-and-commit-interval.png differ

diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 6f08f38..15298a7 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -924,6 +924,181 @@
         <li>Collectively, this allows us to query the full state of the entire application</li>
+    <h3><a id="streams_developer-guide_memory-management" href="#streams_developer-guide_memory-management">Memory
+    <h4><a id="streams_developer-guide_memory-management_record-cache" href="#streams_developer-guide_memory-management_record-cache">Record
caches in the DSL</a></h4>
+    <p>
+    Developers of an application using the DSL have the option to specify, for an instance
of a processing topology, the
+    total memory (RAM) size of a record cache that is leveraged by the following <code>KTable</code>
+    </p>
+    <ol>
+        <li>Source <code>KTable</code>, i.e. <code>KTable</code>
instances that are created via <code>KStreamBuilder#table()</code> or <code>KStreamBuilder#globalTable()</code>.</li>
+        <li>Aggregation <code>KTable</code>, i.e. instances of <code>KTable</code>
that are created as a result of aggregations</li>
+    </ol>
+    <p>
+        For such <code>KTable</code> instances, the record cache is used for:
+    </p>
+    <ol>
+        <li>Internal caching and compacting of output records before they are written
by the underlying stateful processor node to its internal state store.</li>
+        <li>Internal caching and compacting of output records before they are forwarded
from the underlying stateful processor node to any of its downstream processor nodes</li>
+    </ol>
+    <p>
+        Here is a motivating example:
+    </p>
+    <ul>
+        <li>Imagine the input is a <code>KStream&lt;String, Integer&gt;</code>
with the records <code>&lt;A, 1&gt;, &lt;D, 5&gt;, &lt;A, 20&gt;,
&lt;A, 300&gt;</code>.
+            Note that the focus in this example is on the records with key == <code>A</code>
+        </li>
+        <li>
+            An aggregation computes the sum of record values, grouped by key, for the input
above and returns a <code>KTable&lt;String, Integer&gt;</code>.
+            <ul>
+                <li><b>Without caching</b>, what is emitted for key <code>A</code>
is a sequence of output records that represent changes in the
+                    resulting aggregation table (here, the parentheses denote changes, where
the left and right numbers denote the new
+                    aggregate value and the previous aggregate value, respectively):
+                    <code>&lt;A, (1, null)&gt;, &lt;A, (21, 1)&gt;,
&lt;A, (321, 21)&gt;</code>.</li>
+                <li>
+                    <b>With caching</b>, the aforementioned three output records
for key <code>A</code> would likely be compacted in the cache,
+                    leading to a single output record <code>&lt;A, (321, null)&gt;</code>
that is written to the aggregation's internal state store
+                    and being forwarded to any downstream operations.
+                </li>
+            </ul>
+        </li>
+    </ul>
+    <p>
+        The cache size is specified through the <code>cache.max.bytes.buffering</code>
parameter, which is a global setting per processing topology:
+    </p>
+    <pre class="brush: java;">
+        // Enable record cache of size 10 MB.
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024
* 1024L);
+    </pre>
+    <p>
+        This parameter controls the number of bytes allocated for caching.
+        Specifically, for a processor topology instance with <code>T</code> threads
and <code>C</code> bytes allocated for caching,
+        each thread will have an even <code>C/T</code> bytes to construct its
own cache and use as it sees fit among its tasks.
+        I.e., there are as many caches as there are threads, but no sharing of caches across
threads happens.
+        The basic API for the cache is made of <code>put()</code> and <code>get()</code>
+        Records are evicted using a simple LRU scheme once the cache size is reached.
+        The first time a keyed record <code>R1 = &lt;K1, V1&gt;</code>
finishes processing at a node, it is marked as dirty in the cache.
+        Any other keyed record <code>R2 = &lt;K1, V2&gt;</code> with
the same key <code>K1</code> that is processed on that node during that time will
overwrite <code>&lt;K1, V1&gt;</code>, which we also refer to as "being
+        Note that this has the same effect as <a href="https://kafka.apache.org/documentation.html#compaction">Kafka's
log compaction</a>, but happens (a) earlier, while the
+        records are still in memory, and (b) within your client-side application rather than
on the server-side aka the Kafka broker.
+        Upon flushing <code>R2</code> is (1) forwarded to the next processing
node and (2) written to the local state store.
+    </p>
+    <p>
+        The semantics of caching is that data is flushed to the state store and forwarded
to the next downstream processor node
+        whenever the earliest of <code>commit.interval.ms</code> or <code>cache.max.bytes.buffering</code>
(cache pressure) hits.
+        Both <code>commit.interval.ms</code> and <code>cache.max.bytes.buffering</code>
are <b>global</b> parameters:  they apply to all processor nodes in
+        the topology, i.e., it is not possible to specify different parameters for each node.
+        Below we provide some example settings for both parameters based on desired scenarios.
+    </p>
+    <p>To turn off caching the cache size can be set to zero:</p>
+    <pre class="brush: java;">
+        // Disable record cache
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+    </pre>
+    <p>
+        Turning off caching might result in high write traffic for the underlying RocksDB
+        With default settings caching is enabled within Kafka Streams but RocksDB caching
is disabled.
+        Thus, to avoid high write traffic it is recommended to enable RocksDB caching if
Kafka Streams caching is turned off.
+    </p>
+    <p>
+        For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size
to 32 MB.
+    </p>
+    <p>
+        To enable caching but still have an upper bound on how long records will be cached,
the commit interval can be set
+        appropriately (in this example, it is set to 1000 milliseconds):
+    </p>
+    <pre class="brush: java;">
+        Properties streamsConfiguration = new Properties();
+        // Enable record cache of size 10 MB.
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024
* 1024L);
+        // Set commit interval to 1 second.
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+    </pre>
+    <p>
+        The illustration below shows the effect of these two configurations visually.
+        For simplicity we have records with 4 keys: blue, red, yellow and green. Without
loss of generality, let's assume the cache has space for only 3 keys.
+        When the cache is disabled, we observer that all the input records will be output.
With the cache enabled, we make the following observations.
+        First, most records are output at the end of a commit intervals (e.g., at <code>t1</code>
one blue records is output, which is the final over-write of the blue key up to that time).
+        Second, some records are output because of cache pressure, i.e. before the end of
a commit interval (cf. the red record right before t2).
+        With smaller cache sizes we expect cache pressure to be the primary factor that dictates
when records are output. With large cache sizes, the commit interval will be the primary factor.
+        Third, the number of records output has been reduced (here: from 15 to 8).
+    </p>
+    <img class="centered" src="/{{version}}/images/streams-cache-and-commit-interval.png"
+    <h4><a id="streams_developer-guide_memory-management_state-store-cache" href="#streams_developer-guide_memory-management_state-store-cache">State
store caches in the Processor API</a></h4>
+    <p>
+        Developers of a Kafka Streams application using the Processor API have the option
to specify, for an instance of a
+        processing topology, the total memory (RAM) size of the <i>state store cache</i>
that is used for:
+    </p>
+    <ul><li>Internal <i>caching and compacting</i> of output records
before they are written from a <b>stateful</b> processor node to its state stores.</li></ul>
+    <p>
+        Note that, unlike <a href="#streams_developer-guide_memory-management_record-cache">record
caches</a> in the DSL, the state
+        store cache in the Processor API <i>will not cache or compact</i> any
output records that are being forwarded downstream.
+        In other words, downstream processor nodes see all records, whereas the state stores
see a reduced number of records.
+        It is important to note that this does not impact correctness of the system but is
merely a performance optimization
+        for the state stores.
+    </p>
+    <p>
+        A note on terminology: we use the narrower term <i>state store caches</i>
when we refer to the Processor API and the
+        broader term <i>record caches</i> when we are writing about the DSL.
+        We made a conscious choice to not expose the more general record caches to the Processor
API so that we keep it simple and flexible.
+        For example, developers of the Processor API might chose to store a record in a state
store while forwarding a different value downstream, i.e., they
+        might not want to use the unified record cache for both state store and forwarding
+    </p>
+    <p>
+        Following from the example first shown in section <a href="#streams_processor_statestore">State
Stores</a>, to enable caching, you can
+        add the <code>enableCaching</code> call (note that caches are disabled
by default and there is no explicit <code>disableCaching</code>
+        call) :
+    </p>
+    <pre class="brush: java;">
+        StateStoreSupplier countStoreSupplier =
+            Stores.create("Counts")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.Long())
+                .persistent()
+                .enableCaching()
+                .build();
+    </pre>
+    <h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other
memory usage</a></h4>
+    <p>
+    There are other modules inside Apache Kafka that allocate memory during runtime. They
include the following:
+    </p>
+    <ul>
+        <li>Producer buffering, managed by the producer config <code>buffer.memory</code></li>
+        <li>Consumer buffering, currently not strictly managed, but can be indirectly
controlled by fetch size, i.e.,
+            <code>fetch.max.bytes</code> and <code>fetch.max.wait.ms</code>.</li>
+        <li>Both producer and consumer also have separate TCP send / receive buffers
that are not counted as the buffering memory.
+            These are controlled by the <code>send.buffer.bytes</code> / <code>receive.buffer.bytes</code>
+        <li>Deserialized objects buffering: after ``consumer.poll()`` returns records,
they will be deserialized to extract
+            timestamp and buffered in the streams space.
+            Currently this is only indirectly controlled by <code>buffered.records.per.partition</code>.</li>
+        <li>RocksDB's own memory usage, both on-heap and off-heap; critical configs
(for RocksDB version 4.1.0) include
+            <code>block_cache_size</code>, <code>write_buffer_size</code>
and <code>max_write_buffer_number</code>.
+            These can be specified through the ``rocksdb.config.setter`` configuration.</li>
+    </ul>
     <h3><a id="streams_configure_execute" href="#streams_configure_execute">Application
Configuration and Execution</a></h3>

View raw message