kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove deprecated parameter in ProcessorContext#register (#4911)
Date Mon, 07 May 2018 16:22:30 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32e97b1  MINOR: Remove deprecated parameter in ProcessorContext#register (#4911)
32e97b1 is described below

commit 32e97b1d9db46cab526d1882eaf9633934ed21bd
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon May 7 09:22:26 2018 -0700

    MINOR: Remove deprecated parameter in ProcessorContext#register (#4911)
    
    Updated the upgrade doc as well since we do not have an overloaded function without the
deprecated parameter before. Also renamed the 1.2 release version to 2.0.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 docs/streams/upgrade-guide.html                    | 73 +++++++++++-----------
 docs/upgrade.html                                  | 25 ++++----
 .../examples/wordcount/WordCountProcessorTest.java |  2 +-
 .../kstream/internals/KStreamTransformValues.java  |  3 +-
 .../kafka/streams/processor/ProcessorContext.java  |  7 +--
 .../internals/AbstractProcessorContext.java        |  1 -
 .../state/internals/InMemoryKeyValueStore.java     |  2 +-
 .../streams/state/internals/MemoryLRUCache.java    |  2 +-
 .../internals/RocksDBSegmentedBytesStore.java      |  2 +-
 .../streams/state/internals/RocksDBStore.java      |  2 +-
 .../internals/AbstractProcessorContextTest.java    |  6 +-
 .../internals/ProcessorStateManagerTest.java       |  2 +-
 .../streams/state/KeyValueStoreTestDriver.java     |  2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  1 -
 .../java/org/apache/kafka/test/MockStateStore.java |  2 +-
 .../apache/kafka/test/NoOpProcessorContext.java    |  1 -
 .../streams/processor/MockProcessorContext.java    |  1 -
 .../kafka/streams/MockProcessorContextTest.java    |  2 +-
 18 files changed, 65 insertions(+), 71 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 462824f..646908d 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -34,28 +34,28 @@
     </div>
 
     <p>
-        If you want to upgrade from 1.1.x to 1.2.0 and you have customized window store implementations
on the <code>ReadOnlyWindowStore</code> interface
+        If you want to upgrade from 1.1.x to 2.0.0 and you have customized window store implementations
on the <code>ReadOnlyWindowStore</code> interface
         you'd need to update your code to incorporate the newly added public APIs; otherwise
you don't need to make any code changes.
-        See <a href="#streams_api_changes_120">below</a> for a complete list
of 1.2.0 API and semantic changes that allow you to advance your application and/or simplify
your code base.
+        See <a href="#streams_api_changes_200">below</a> for a complete list
of 2.0.0 API and semantic changes that allow you to advance your application and/or simplify
your code base.
     </p>
 
     <p>
-        If you want to upgrade from 1.0.x to 1.2.0 and you have customized window store implementations
on the <code>ReadOnlyWindowStore</code> interface
+        If you want to upgrade from 1.0.x to 2.0.0 and you have customized window store implementations
on the <code>ReadOnlyWindowStore</code> interface
         you'd need to update your code to incorporate the newly added public APIs.
         Otherwise, if you are using Java 7 you don't need to make any code changes as the
public API is fully backward compatible;
         but if you are using Java 8 method references in your Kafka Streams code you might
need to update your code to resolve method ambiguities.
         Hot-swaping the jar-file only might not work for this case.
-        See below a complete list of <a href="#streams_api_changes_120">1.2.0</a>
and <a href="#streams_api_changes_110">1.1.0</a>
+        See below a complete list of <a href="#streams_api_changes_200">2.0.0</a>
and <a href="#streams_api_changes_110">1.1.0</a>
         API and semantic changes that allow you to advance your application and/or simplify
your code base.
     </p>
 
     <p>
-        If you want to upgrade from 0.10.2.x or 0.11.0.x to 1.2.x and you have customized
window store implementations on the <code>ReadOnlyWindowStore</code> interface
+        If you want to upgrade from 0.10.2.x or 0.11.0.x to 2.0.x and you have customized
window store implementations on the <code>ReadOnlyWindowStore</code> interface
         you'd need to update your code to incorporate the newly added public APIs.
         Otherwise, if you are using Java 7 you don't need to do any code changes as the public
API is fully backward compatible;
         but if you are using Java 8 method references in your Kafka Streams code you might
need to update your code to resolve method ambiguities.
         However, some public APIs were deprecated and thus it is recommended to update your
code eventually to allow for future upgrades.
-        See below a complete list of <a href="#streams_api_changes_120">1.2</a>,
<a href="#streams_api_changes_110">1.1</a>,
+        See below a complete list of <a href="#streams_api_changes_200">2.0</a>,
<a href="#streams_api_changes_110">1.1</a>,
         <a href="#streams_api_changes_100">1.0</a>, and <a href="#streams_api_changes_0110">0.11.0</a>
API
         and semantic changes that allow you to advance your application and/or simplify your
code base, including the usage of new features.
         Additionally, Streams API 1.1.x requires broker on-disk message format version 0.10
or higher; thus, you need to make sure that the message
@@ -63,45 +63,43 @@
     </p>
 
     <p>
-        If you want to upgrade from 0.10.1.x to 1.2.x see the Upgrade Sections for <a
href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>,
+        If you want to upgrade from 0.10.1.x to 2.0.x see the Upgrade Sections for <a
href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>,
         <a href="/{{version}}/documentation/#upgrade_1100_streams"><b>0.11.0</b></a>,
         <a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>,
-        <a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>,
and
-        <a href="/{{version}}/documentation/#upgrade_110_streams"><b>1.2</b></a>.
-        Note, that a brokers on-disk message format must be on version 0.10 or higher to
run a Kafka Streams application version 1.2 or higher.
+        <a href="/{{version}}/documentation/#upgrade_110_streams"><b>1.1</b></a>,
and
+        <a href="/{{version}}/documentation/#upgrade_200_streams"><b>2.0</b></a>.
+        Note, that a brokers on-disk message format must be on version 0.10 or higher to
run a Kafka Streams application version 2.0 or higher.
         See below a complete list of <a href="#streams_api_changes_0102">0.10.2</a>,
<a href="#streams_api_changes_0110">0.11.0</a>,
-        <a href="#streams_api_changes_100">1.0</a>, <a href="#streams_api_changes_110">1.1</a>,
and <a href="#streams_api_changes_120">1.2</a>
+        <a href="#streams_api_changes_100">1.0</a>, <a href="#streams_api_changes_110">1.1</a>,
and <a href="#streams_api_changes_200">2.0</a>
         API and semantical changes that allow you to advance your application and/or simplify
your code base, including the usage of new features.
     </p>
 
     <p>
-        Upgrading from 0.10.0.x to 1.2.0 directly is also possible.
+        Upgrading from 0.10.0.x to 2.0.0 directly is also possible.
         Note, that a brokers must be on version 0.10.1 or higher and on-disk message format
must be on version 0.10 or higher
-        to run a Kafka Streams application version 1.2 or higher.
+        to run a Kafka Streams application version 2.0 or higher.
         See <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>,
<a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a>,
         <a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
<a href="#streams_api_changes_100">Streams API changes in 1.0</a>, and
-        <a href="#streams_api_changes_110">Streams API changes in 1.1</a>, and
<a href="#streams_api_changes_120">Streams API changes in 1.2</a>
+        <a href="#streams_api_changes_110">Streams API changes in 1.1</a>, and
<a href="#streams_api_changes_200">Streams API changes in 2.0</a>
         for a complete list of API changes.
-        Upgrading to 1.2.0 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code>
set for first upgrade phase
+        Upgrading to 2.0.0 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code>
set for first upgrade phase
         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
         As an alternative, an offline upgrade is also possible.
     </p>
     <ul>
-        <li> prepare your application instances for a rolling bounce and make sure
that config <code>upgrade.from</code> is set to <code>"0.10.0"</code>
for new version 1.2.0</li>
+        <li> prepare your application instances for a rolling bounce and make sure
that config <code>upgrade.from</code> is set to <code>"0.10.0"</code>
for new version 2.0.0</li>
         <li> bounce each instance of your application once </li>
-        <li> prepare your newly deployed 1.2.0 application instances for a second round
of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code>
</li>
+        <li> prepare your newly deployed 2.0.0 application instances for a second round
of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code>
</li>
         <li> bounce each instance of your application once more to complete the upgrade
</li>
     </ul>
-    <p> Upgrading from 0.10.0.x to 1.2.0 in offline mode: </p>
+    <p> Upgrading from 0.10.0.x to 2.0.0 in offline mode: </p>
     <ul>
         <li> stop all old (0.10.0.x) application instances </li>
         <li> update your code and swap old code and jar file with new code and new
jar file </li>
-        <li> restart all new (1.2.0) application instances </li>
+        <li> restart all new (2.0.0) application instances </li>
     </ul>
 
-    <!-- TODO: verify release verion and update `id` and `href` attributes (also at other
places that link to this headline) -->
-    
-    <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams
API changes in 1.2.0</a></h3>
+    <h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams
API changes in 2.0.0</a></h3>
     <p>
         We have removed the <code>skippedDueToDeserializationError-rate</code>
and <code>skippedDueToDeserializationError-total</code> metrics.
         Deserialization errors, and all other causes of record skipping, are now accounted
for in the pre-existing metrics
@@ -138,7 +136,6 @@
         For users who have customized window store implementations on the above interface,
they'd need to update their code to implement the newly added method as well.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores">KIP-261</a>.
     </p>
-
     <p>
         We have added public <code>WindowedSerdes</code> to allow users to read
from / write to a topic storing windowed table changelogs directly.
         In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code>
and <code>default.windowed.value.serde.inner</code>
@@ -151,22 +148,26 @@
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
     </p>
     <p>
-      Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API
(<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
-      To enable this new feature, <code>ProcessorContext#forward(...)</code>
was modified.
-      The two existing overloads <code>#forward(Object key, Object value, String childName)</code>
and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated
and a new overload <code>#forward(Object key, Object value, To to)</code> was
added.
-      The new class <code>To</code> allows you to send records to all or specific
downstream processors by name and to set the timestamp for the output record.
-      Forwarding based on child index is not supported in the new API any longer.
+        Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor
API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
+        To enable this new feature, <code>ProcessorContext#forward(...)</code>
was modified.
+        The two existing overloads <code>#forward(Object key, Object value, String
childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code>
were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code>
was added.
+        The new class <code>To</code> allows you to send records to all or specific
downstream processors by name and to set the timestamp for the output record.
+        Forwarding based on child index is not supported in the new API any longer.
     </p>
     <p>
         <a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed
the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
         Instead of relying on data retention Kafka Streams uses the new purge data API to
delete consumed data from those topics and to keep used storage small now.
     </p>
     <p>
-      Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers
authoring Kafka Streams applications in Scala.  It wraps core Kafka Streams DSL types to make
it easier to call when
-      interoperating with Scala code.  For example, it includes higher order functions as
parameters for transformations avoiding the need anonymous classes in Java 7 or experimental
SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection
types, a way
-      to implicitly provide SerDes to reduce boilerplate from your application and make it
more typesafe, and more!  For more information see the
-      <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka
Streams DSL for Scala documentation</a> and
-      <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams">KIP-270</a>.
+        We have modified the <code>ProcessorStateManger#register(...)</code>
signature and removed the deprecated <code>loggingEnabled</code> boolean parameter
as it is specified in the <code>StoreBuilder</code>.
+        Users who used this function to register their state stores into the processor topology
need to simply update their code and remove this parameter from the caller.
+    </p>
+    <p>
+        Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers
authoring Kafka Streams applications in Scala.  It wraps core Kafka Streams DSL types to make
it easier to call when
+        interoperating with Scala code.  For example, it includes higher order functions
as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental
SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection
types, a way
+        to implicitly provide SerDes to reduce boilerplate from your application and make
it more typesafe, and more!  For more information see the
+        <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka
Streams DSL for Scala documentation</a> and
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams">KIP-270</a>.
     </p>
 
     <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams
API changes in 1.1.0</a></h3>
@@ -177,9 +178,9 @@
     </p>
 
     <p>
-	There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>,
<code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code>
class.
-	You can include the new artifact as a regular dependency to your unit tests and use the
test driver to test your business logic of your Kafka Streams application.
-	For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
+	    There is a new artifact <code>kafka-streams-test-utils</code> providing
a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>,
and <code>OutputVerifier</code> class.
+	    You can include the new artifact as a regular dependency to your unit tests and use
the test driver to test your business logic of your Kafka Streams application.
+	    For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
     </p>
 
     <p>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 8bfc61e..451f103 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -20,9 +20,9 @@
 <script id="upgrade-template" type="text/x-handlebars-template">
 
 
-<h4><a id="upgrade_1_2_0" href="#upgrade_1_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 1.2.x</a></h4>
-<p>Kafka 1.2.0 introduces wire protocol changes. By following the recommended rolling
upgrade plan below,
-    you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_120_notable">notable
changes in 1.2.0</a> before upgrading.
+<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.x</a></h4>
+<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling
upgrade plan below,
+    you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_200_notable">notable
changes in 2.0.0</a> before upgrading.
 </p>
 
 <p><b>For a rolling upgrade:</b></p>
@@ -48,7 +48,7 @@
     <li> Restart the brokers one by one for the new protocol version to take effect.</li>
     <li> If you have overridden the message format version as instructed above, then
you need to do one more rolling restart to
         upgrade it to its latest version. Once all (or most) consumers have been upgraded
to 0.11.0 or later,
-        change log.message.format.version to 1.2 on each broker and restart them one by one.
Note that the older Scala consumer
+        change log.message.format.version to 2.0 on each broker and restart them one by one.
Note that the older Scala consumer
         does not support the new message format introduced in 0.11, so to avoid the performance
cost of down-conversion (or to
         take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once
semantics</a>), the newer Java consumer must be used.</li>
 </ol>
@@ -64,7 +64,7 @@
         Hot-swapping the jar-file only might not work.</li>
 </ol>
 
-<h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable changes in
1.2.0</a></h5>
+<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in
2.0.0</a></h5>
 <ul>
     <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a>
increases the default offset retention time from 1 day to 7 days. This makes it less likely
to "lose" offsets in an application that commits infrequently. It also increases the active
set of offsets and therefore can increase memory usage on the broker. Note that the console
consumer currently enables offset commit by default and can be the source of a large number
of offsets which this change will now preserve for [...]
     <li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a>
extends the lower interval of <code>max.connections.per.ip minimum</code> to zero
and therefore allows IP-based filtering of inbound connections.</li>
@@ -74,18 +74,19 @@
         JMX monitoring tools that do not automatically aggregate. To get the total count
for a specific request type, the tool needs to be
         updated to aggregate across different versions.
     </li>
-    <li> New Kafka Streams configuration parameter <code>upgrade.from</code>
added that allows rolling bounce upgrade from older version. </li>
-    <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
+    <li>New Kafka Streams configuration parameter <code>upgrade.from</code>
added that allows rolling bounce upgrade from older version. </li>
+    <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for Kafka Streams repartition topics by setting its default value
to <code>Long.MAX_VALUE</code>.</li>
+    <li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams
for registering state stores to the processor topology. For more details please read the Streams
<a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade
Guide</a>.</li>
 </ul>
 
-<h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol
Versions</a></h5>
+<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol
Versions</a></h5>
 <ul></ul>
 
-<h5><a id="upgrade_120_streams" href="#upgrade_120_streams">Upgrading a 1.2.0
Kafka Streams Application</a></h5>
+<h5><a id="upgrade_200_streams" href="#upgrade_200_streams">Upgrading a 2.0.0
Kafka Streams Application</a></h5>
 <ul>
-    <li> Upgrading your Streams application from 1.1.0 to 1.2.0 does not require a
broker upgrade.
-         A Kafka Streams 1.2.0 application can connect to 1.2, 1.1, 1.0, 0.11.0, 0.10.2 and
0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
-    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams
API changes in 1.2.0</a> for more details. </li>
+    <li> Upgrading your Streams application from 1.1.0 to 2.0.0 does not require a
broker upgrade.
+         A Kafka Streams 2.0.0 application can connect to 2.0, 1.1, 1.0, 0.11.0, 0.10.2 and
0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Streams
API changes in 2.0.0</a> for more details. </li>
 </ul>
 
 <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4>
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index 566b7d4..faced6d 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -44,7 +44,7 @@ public class WordCountProcessorTest {
                 .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
                 .build();
         store.init(context, store);
-        context.register(store, false, null);
+        context.register(store, null);
 
         // Create and initialize the processor under test
         final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index d09fae2..fb6af34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -91,9 +91,8 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
 
                     @Override
                     public void register(final StateStore store,
-                                         final boolean deprecatedAndIgnoredLoggingEnabled,
                                          final StateRestoreCallback stateRestoreCallback)
{
-                        context.register(store, deprecatedAndIgnoredLoggingEnabled, stateRestoreCallback);
+                        context.register(store, stateRestoreCallback);
                     }
 
                     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 93a1455..79d191c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -76,15 +76,12 @@ public interface ProcessorContext {
      * Registers and possibly restores the specified storage engine.
      *
      * @param store the storage engine
-     * @param loggingEnabledIsDeprecatedAndIgnored deprecated parameter {@code loggingEnabled}
is ignored:
-     *                                             if you want to enable logging on a state
stores call
-     *                                             {@link org.apache.kafka.streams.state.StoreBuilder#withLoggingEnabled(Map)}
-     *                                             when creating the store
+     * @param stateRestoreCallback the restoration callback logic for log-backed state stores
upon restart
+     *
      * @throws IllegalStateException If store gets registered after initialized is already
finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
     void register(final StateStore store,
-                  final boolean loggingEnabledIsDeprecatedAndIgnored,
                   final StateRestoreCallback stateRestoreCallback);
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index fc3067e..9687477 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -92,7 +92,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
 
     @Override
     public void register(final StateStore store,
-                         final boolean deprecatedAndIgnoredLoggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
         if (initialized) {
             throw new IllegalStateException("Can only create state stores during initialization.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 2cdfc4b..9ea75a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -72,7 +72,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
 
         if (root != null) {
             // register the store
-            context.register(root, false, new StateRestoreCallback() {
+            context.register(root, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
                     // this is a delete
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 2785540..b99c907 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -111,7 +111,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K,
V> {
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         // register the store
-        context.register(root, false, new StateRestoreCallback() {
+        context.register(root, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
                 restoring = true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 865703c..ec9e6f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -126,7 +126,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
         segments.openExisting(context);
 
         // register and possibly restore the state from the logs
-        context.register(root, false, new StateRestoreCallback() {
+        context.register(root, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
                 put(Bytes.wrap(key), value);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f54c783..2813041 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -169,7 +169,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
 
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        context.register(root, false, this.batchingStateRestoreCallback);
+        context.register(root, this.batchingStateRestoreCallback);
     }
 
     private RocksDB openDB(final File dir,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 43dc38e..86806b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -55,7 +55,7 @@ public class AbstractProcessorContextTest {
     public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() {
         context.initialized();
         try {
-            context.register(stateStore, false, null);
+            context.register(stateStore, null);
             fail("should throw illegal state exception when context already initialized");
         } catch (IllegalStateException e) {
             // pass
@@ -64,12 +64,12 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized()
{
-        context.register(stateStore, false, null);
+        context.register(stateStore, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
-        context.register(null, false, null);
+        context.register(null, null);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 31f07cc..6a20cd9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -687,7 +687,7 @@ public class ProcessorStateManagerTest {
 
         stateManager.reinitializeStateStoresForPartitions(changelogPartitions, new NoOpProcessorContext()
{
             @Override
-            public void register(final StateStore store, final boolean deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
+            public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback)
{
                 stateManager.register(store, stateRestoreCallback);
             }
         });
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 33591c6..4e80fa7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -98,7 +98,7 @@ import java.util.Set;
  *
  * <h2>Restoring a store</h2>
  * This component can be used to test whether a {@link KeyValueStore} implementation properly
- * {@link ProcessorContext#register(StateStore, boolean, StateRestoreCallback) registers
itself} with the {@link ProcessorContext}, so that
+ * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with
the {@link ProcessorContext}, so that
  * the persisted contents of a store are properly restored from the flushed entries when
the store instance is started.
  * <p>
  * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object,
Object) add entries} that will be
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 27a0094..eb72e13 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -181,7 +181,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext
imple
 
     @Override
     public void register(final StateStore store,
-                         final boolean deprecatedAndIgnoredLoggingEnabled,
                          final StateRestoreCallback func) {
         storeMap.put(store.name(), store);
         restoreFuncs.put(store.name(), func);
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java b/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
index f218f04..a2b0d21 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
@@ -47,7 +47,7 @@ public class MockStateStore implements StateStore {
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        context.register(root, false, stateRestoreCallback);
+        context.register(root, stateRestoreCallback);
         initialized = true;
         closed = false;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index e931c7e..92f84c5 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -87,7 +87,6 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
 
     @Override
     public void register(final StateStore store,
-                         final boolean deprecatedAndIgnoredLoggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
         // no-op
     }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index c387c36..3e29cde 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -337,7 +337,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public void register(final StateStore store,
-                         final boolean loggingEnabledIsDeprecatedAndIgnored,
                          final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
{
         stateStores.put(store.name(), store);
     }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 934e043..dbb26e0 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -240,7 +240,7 @@ public class MockProcessorContextTest {
 
         final MockProcessorContext context = new MockProcessorContext();
         final KeyValueStore<String, Long> store = new InMemoryKeyValueStore<>("my-state",
Serdes.String(), Serdes.Long());
-        context.register(store, false, null);
+        context.register(store, null);
 
         store.init(context, store);
         processor.init(context);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message