This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 046b008 MINOR: improve Streams checkstyle and code cleanup (#5954)
046b008 is described below
commit 046b0087bd76637bbfd813ccef31693fa358ff2d
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Tue Dec 11 01:54:41 2018 -0800
MINOR: improve Streams checkstyle and code cleanup (#5954)
Reviewers: Guozhang Wang <guozhang@confluent.io>, Nikolay Izhikov <nIzhikov@gmail.com>, Ismael Juma <ismael@confluent.io>, Bill Bejeck <bill@confluent.io>
---
.../examples/temperature/TemperatureDemo.java | 7 +-
.../org/apache/kafka/streams/KafkaStreams.java | 98 +++++------
.../java/org/apache/kafka/streams/KeyValue.java | 3 +-
.../apache/kafka/streams/internals/ApiUtils.java | 6 +-
.../org/apache/kafka/streams/kstream/Consumed.java | 4 +-
.../apache/kafka/streams/kstream/JoinWindows.java | 69 ++++----
.../org/apache/kafka/streams/kstream/KStream.java | 49 +++---
.../org/apache/kafka/streams/kstream/KTable.java | 12 +-
.../apache/kafka/streams/kstream/Materialized.java | 5 +-
.../apache/kafka/streams/kstream/Serialized.java | 2 +-
.../kafka/streams/kstream/SessionWindows.java | 36 +++--
.../kafka/streams/kstream/TimeWindowedKStream.java | 6 +-
.../streams/kstream/TimeWindowedSerializer.java | 6 +-
.../apache/kafka/streams/kstream/TimeWindows.java | 47 +++---
.../kafka/streams/kstream/UnlimitedWindows.java | 15 +-
.../org/apache/kafka/streams/kstream/Window.java | 7 +-
.../org/apache/kafka/streams/kstream/Windowed.java | 8 +-
.../kstream/internals/CacheFlushListener.java | 4 +-
.../kafka/streams/kstream/internals/Change.java | 8 +-
.../kstream/internals/ChangedSerializer.java | 8 +-
.../kstream/internals/KStreamTransform.java | 7 +-
.../streams/kstream/internals/KTableFilter.java | 6 +-
.../streams/kstream/internals/KTableMapValues.java | 3 +-
.../streams/kstream/internals/PrintedInternal.java | 6 -
.../streams/kstream/internals/SessionWindow.java | 2 +-
.../streams/kstream/internals/TimeWindow.java | 2 +-
.../streams/kstream/internals/UnlimitedWindow.java | 2 +-
.../internals/suppress/EagerBufferConfigImpl.java | 8 +-
.../suppress/FinalResultsSuppressionBuilder.java | 8 +-
.../internals/suppress/StrictBufferConfigImpl.java | 8 +-
.../internals/suppress/SuppressedInternal.java | 8 +-
.../apache/kafka/streams/processor/Processor.java | 9 +-
.../org/apache/kafka/streams/processor/TaskId.java | 7 +-
.../processor/internals/ProcessorContextImpl.java | 4 +-
.../internals/ProcessorRecordContext.java | 8 +-
.../streams/processor/internals/QuickUnion.java | 8 +-
.../internals/RepartitionTopicConfig.java | 10 +-
.../streams/processor/internals/SourceNode.java | 9 +-
.../kafka/streams/processor/internals/Stamped.java | 17 +-
.../internals/StreamsPartitionAssignor.java | 16 +-
.../internals/UnwindowedChangelogTopicConfig.java | 10 +-
.../internals/WindowedChangelogTopicConfig.java | 10 +-
.../internals/assignment/ClientState.java | 11 +-
.../internals/assignment/StickyTaskAssignor.java | 8 +-
.../internals/metrics/StreamsMetricsImpl.java | 3 +-
.../streams/state/KeyValueBytesStoreSupplier.java | 4 +-
.../kafka/streams/state/QueryableStoreType.java | 2 +-
.../kafka/streams/state/QueryableStoreTypes.java | 7 +-
.../kafka/streams/state/ReadOnlyWindowStore.java | 13 +-
.../org/apache/kafka/streams/state/Stores.java | 11 +-
.../kafka/streams/state/StreamsMetadata.java | 23 ++-
.../streams/state/WindowBytesStoreSupplier.java | 4 +-
.../apache/kafka/streams/state/WindowStore.java | 24 +--
.../internals/CompositeReadOnlyWindowStore.java | 50 +++---
.../streams/state/internals/ContextualRecord.java | 10 +-
.../InMemoryTimeOrderedKeyValueBuffer.java | 8 +-
.../streams/state/internals/LRUCacheEntry.java | 8 +-
.../streams/state/internals/MemoryLRUCache.java | 27 ++--
.../streams/state/internals/RocksDBStore.java | 7 +-
.../kafka/streams/state/internals/Segment.java | 7 +-
.../kafka/streams/state/internals/Segments.java | 3 +-
.../integration/AbstractJoinIntegrationTest.java | 39 ++---
.../integration/RegexSourceIntegrationTest.java | 77 +++------
.../integration/RestoreIntegrationTest.java | 81 +++-------
.../streams/integration/utils/KafkaEmbedded.java | 17 +-
.../kstream/internals/AbstractStreamTest.java | 7 +-
.../apache/kafka/streams/perf/YahooBenchmark.java | 116 ++++++-------
.../processor/internals/ProcessorTopologyTest.java | 6 +-
.../internals/StreamsPartitionAssignorTest.java | 180 ++++++++-------------
.../CompositeReadOnlyWindowStoreTest.java | 3 +-
.../state/internals/ReadOnlyWindowStoreStub.java | 63 ++++----
.../kafka/streams/tests/SmokeTestDriver.java | 23 ++-
.../streams/processor/MockProcessorContext.java | 21 ++-
.../kafka/streams/MockProcessorContextTest.java | 9 +-
74 files changed, 697 insertions(+), 743 deletions(-)
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 93480e4..4994a5d 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.examples.temperature;
-import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -29,6 +28,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
+import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -90,10 +90,11 @@ public class TemperatureDemo {
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
.reduce((value1, value2) -> {
- if (Integer.parseInt(value1) > Integer.parseInt(value2))
+ if (Integer.parseInt(value1) > Integer.parseInt(value2)) {
return value1;
- else
+ } else {
return value2;
+ }
})
.toStream()
.filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 420c51f..bbd0105 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
-import java.time.Duration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -63,6 +62,7 @@ import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -77,7 +77,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.getHost;
@@ -127,7 +126,6 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
- private static final int DEFAULT_CLOSE_TIMEOUT = 0;
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
@@ -135,7 +133,6 @@ public class KafkaStreams implements AutoCloseable {
// usage only and should not be exposed to users at all.
private final Time time;
private final Logger log;
- private final UUID processId;
private final String clientId;
private final Metrics metrics;
private final StreamsConfig config;
@@ -386,7 +383,9 @@ public class KafkaStreams implements AutoCloseable {
result.putAll(thread.consumerMetrics());
result.putAll(thread.adminClientMetrics());
}
- if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
+ if (globalStreamThread != null) {
+ result.putAll(globalStreamThread.consumerMetrics());
+ }
result.putAll(metrics.metrics());
return Collections.unmodifiableMap(result);
}
@@ -633,7 +632,7 @@ public class KafkaStreams implements AutoCloseable {
this.time = time;
// The application ID is a required config and hence should always have value
- processId = UUID.randomUUID();
+ final UUID processId = UUID.randomUUID();
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
@@ -733,13 +732,10 @@ public class KafkaStreams implements AutoCloseable {
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
- stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(final Runnable r) {
- final Thread thread = new Thread(r, clientId + "-CleanupThread");
- thread.setDaemon(true);
- return thread;
- }
+ stateDirCleaner = Executors.newSingleThreadScheduledExecutor(r -> {
+ final Thread thread = new Thread(r, clientId + "-CleanupThread");
+ thread.setDaemon(true);
+ return thread;
});
}
@@ -791,12 +787,9 @@ public class KafkaStreams implements AutoCloseable {
}
final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
- stateDirCleaner.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- if (state == State.RUNNING) {
- stateDirectory.cleanRemovedTasks(cleanupDelay);
- }
+ stateDirCleaner.scheduleAtFixedRate(() -> {
+ if (state == State.RUNNING) {
+ stateDirectory.cleanRemovedTasks(cleanupDelay);
}
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
@@ -814,7 +807,7 @@ public class KafkaStreams implements AutoCloseable {
* This will block until all threads have stopped.
*/
public void close() {
- close(DEFAULT_CLOSE_TIMEOUT, TimeUnit.SECONDS);
+ close(Long.MAX_VALUE);
}
/**
@@ -856,45 +849,42 @@ public class KafkaStreams implements AutoCloseable {
// wait for all threads to join in a separate thread;
// save the current thread so that if it is a stream thread
// we don't attempt to join it and cause a deadlock
- final Thread shutdownThread = new Thread(new Runnable() {
- @Override
- public void run() {
- // notify all the threads to stop; avoid deadlocks by stopping any
- // further state reports from the thread since we're shutting down
- for (final StreamThread thread : threads) {
- thread.setStateListener(null);
- thread.shutdown();
- }
+ final Thread shutdownThread = new Thread(() -> {
+ // notify all the threads to stop; avoid deadlocks by stopping any
+ // further state reports from the thread since we're shutting down
+ for (final StreamThread thread : threads) {
+ thread.setStateListener(null);
+ thread.shutdown();
+ }
- for (final StreamThread thread : threads) {
- try {
- if (!thread.isRunning()) {
- thread.join();
- }
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
+ for (final StreamThread thread : threads) {
+ try {
+ if (!thread.isRunning()) {
+ thread.join();
}
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
}
+ }
- if (globalStreamThread != null) {
- globalStreamThread.setStateListener(null);
- globalStreamThread.shutdown();
- }
+ if (globalStreamThread != null) {
+ globalStreamThread.setStateListener(null);
+ globalStreamThread.shutdown();
+ }
- if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
- try {
- globalStreamThread.join();
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- globalStreamThread = null;
+ if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
+ try {
+ globalStreamThread.join();
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ globalStreamThread = null;
+ }
- adminClient.close();
+ adminClient.close();
- metrics.close();
- setState(State.NOT_RUNNING);
- }
+ metrics.close();
+ setState(State.NOT_RUNNING);
}, "kafka-streams-close-thread");
shutdownThread.setDaemon(true);
@@ -918,14 +908,12 @@ public class KafkaStreams implements AutoCloseable {
* @param timeout how long to wait for the threads to shutdown
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
* before all threads stopped
- * Note that this method must not be called in the {@link StateListener#onChange(State, State)} callback of {@link StateListener}.
+ * Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
- ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
-
- final long timeoutMs = timeout.toMillis();
+ final long timeoutMs = ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
index 425e272..604cf7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -63,8 +63,9 @@ public class KeyValue<K, V> {
@Override
public boolean equals(final Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
+ }
if (!(obj instanceof KeyValue)) {
return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index dd3b691..86c9fc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -36,8 +36,9 @@ public final class ApiUtils {
*/
public static long validateMillisecondDuration(final Duration duration, final String messagePrefix) {
try {
- if (duration == null)
+ if (duration == null) {
throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
+ }
return duration.toMillis();
} catch (final ArithmeticException e) {
@@ -53,8 +54,9 @@ public final class ApiUtils {
*/
public static long validateMillisecondInstant(final Instant instant, final String messagePrefix) {
try {
- if (instant == null)
+ if (instant == null) {
throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
+ }
return instant.toEpochMilli();
} catch (final ArithmeticException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 0af7dbe..667b621 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -121,7 +121,7 @@ public class Consumed<K, V> {
}
/**
- * Create an instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
+ * Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @param <K> key type
@@ -166,7 +166,7 @@ public class Consumed<K, V> {
}
/**
- * Configure the instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
+ * Configure the instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @return this
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 8256890..219489f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -77,22 +77,25 @@ public final class JoinWindows extends Windows<Window> {
/** Maximum time difference for tuples that are after the join tuple. */
public final long afterMs;
- private final Duration grace;
+ private final long graceMs;
- private JoinWindows(final long beforeMs, final long afterMs, final Duration grace, final long maintainDurationMs) {
+ private JoinWindows(final long beforeMs,
+ final long afterMs,
+ final long graceMs,
+ final long maintainDurationMs) {
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
}
this.afterMs = afterMs;
this.beforeMs = beforeMs;
- this.grace = grace;
+ this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
private JoinWindows(final long beforeMs,
final long afterMs,
- final Duration grace,
+ final long graceMs,
final long maintainDurationMs,
final int segments) {
super(segments);
@@ -101,7 +104,7 @@ public final class JoinWindows extends Windows<Window> {
}
this.afterMs = afterMs;
this.beforeMs = beforeMs;
- this.grace = grace;
+ this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
@@ -117,7 +120,7 @@ public final class JoinWindows extends Windows<Window> {
@Deprecated
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
// This is a static factory method, so we initialize grace and retention to the defaults.
- return new JoinWindows(timeDifferenceMs, timeDifferenceMs, null, DEFAULT_RETENTION_MS);
+ return new JoinWindows(timeDifferenceMs, timeDifferenceMs, -1L, DEFAULT_RETENTION_MS);
}
/**
@@ -128,10 +131,10 @@ public final class JoinWindows extends Windows<Window> {
* @param timeDifference join window interval
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
*/
+ @SuppressWarnings("deprecation")
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
- return of(timeDifference.toMillis());
+ return of(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
@@ -145,10 +148,10 @@ public final class JoinWindows extends Windows<Window> {
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #before(Duration)} instead.
*/
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Deprecated
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
- return new JoinWindows(timeDifferenceMs, afterMs, grace, maintainDurationMs, segments);
+ return new JoinWindows(timeDifferenceMs, afterMs, graceMs, maintainDurationMs, segments);
}
/**
@@ -161,11 +164,10 @@ public final class JoinWindows extends Windows<Window> {
* @param timeDifference relative window start time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
- return before(timeDifference.toMillis());
+ return before(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
@@ -179,10 +181,10 @@ public final class JoinWindows extends Windows<Window> {
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #after(Duration)} instead
*/
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Deprecated
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
- return new JoinWindows(beforeMs, timeDifferenceMs, grace, maintainDurationMs, segments);
+ return new JoinWindows(beforeMs, timeDifferenceMs, graceMs, maintainDurationMs, segments);
}
/**
@@ -195,11 +197,10 @@ public final class JoinWindows extends Windows<Window> {
* @param timeDifference relative window end time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
- return after(timeDifference.toMillis());
+ return after(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
@@ -228,14 +229,14 @@ public final class JoinWindows extends Windows<Window> {
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
- if (afterWindowEnd.toMillis() < 0) {
+ final long afterWindowEndMs = ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+ if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
- return new JoinWindows(beforeMs, afterMs, afterWindowEnd, maintainDurationMs, segments);
+ return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, maintainDurationMs, segments);
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@@ -244,7 +245,7 @@ public final class JoinWindows extends Windows<Window> {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - size) if you want to be super accurate.
- return grace != null ? grace.toMillis() : maintainMs() - size();
+ return graceMs != -1 ? graceMs : maintainMs() - size();
}
/**
@@ -260,7 +261,7 @@ public final class JoinWindows extends Windows<Window> {
if (durationMs < size()) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
}
- return new JoinWindows(beforeMs, afterMs, grace, durationMs, segments);
+ return new JoinWindows(beforeMs, afterMs, graceMs, durationMs, segments);
}
/**
@@ -271,7 +272,7 @@ public final class JoinWindows extends Windows<Window> {
* @return the window maintain duration
* @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}.
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "deprecatedMemberStillInUse"})
@Override
@Deprecated
public long maintainMs() {
@@ -281,29 +282,33 @@ public final class JoinWindows extends Windows<Window> {
@SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final JoinWindows that = (JoinWindows) o;
return beforeMs == that.beforeMs &&
afterMs == that.afterMs &&
maintainDurationMs == that.maintainDurationMs &&
segments == that.segments &&
- Objects.equals(grace, that.grace);
+ graceMs == that.graceMs;
}
@SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
@Override
public int hashCode() {
- return Objects.hash(beforeMs, afterMs, grace, maintainDurationMs, segments);
+ return Objects.hash(beforeMs, afterMs, graceMs, maintainDurationMs, segments);
}
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public String toString() {
return "JoinWindows{" +
"beforeMs=" + beforeMs +
", afterMs=" + afterMs +
- ", grace=" + grace +
+ ", graceMs=" + graceMs +
", maintainDurationMs=" + maintainDurationMs +
", segments=" + segments +
'}';
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 6d83340..b961709 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -387,6 +387,7 @@ public interface KStream<K, V> {
*
* @param action an action to perform on each record
* @see #process(ProcessorSupplier, String...)
+ * @return itself
*/
KStream<K, V> peek(final ForeachAction<? super K, ? super V> action);
@@ -772,8 +773,8 @@ public interface KStream<K, V> {
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -802,8 +803,8 @@ public interface KStream<K, V> {
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka
* if a later operator depends on the newly selected key.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -863,8 +864,8 @@ public interface KStream<K, V> {
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
* later operator depends on the newly selected key.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -892,8 +893,8 @@ public interface KStream<K, V> {
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
* later operator depends on the newly selected key.
- * This topic will be as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -924,8 +925,8 @@ public interface KStream<K, V> {
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -990,9 +991,9 @@ public interface KStream<K, V> {
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
- * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
+ * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
* "-repartition" is a fixed suffix.
*
* <p>
@@ -1068,9 +1069,9 @@ public interface KStream<K, V> {
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
- * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
+ * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
* "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1151,9 +1152,9 @@ public interface KStream<K, V> {
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
- * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
+ * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and
* "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1301,7 +1302,7 @@ public interface KStream<K, V> {
* <tr>
* <td><K2:B></td>
* <td><K2:b></td>
- * <td><K2:ValueJoiner(null,b)><br /><K2:ValueJoiner(B,b)></td>
+ * <td><K2:ValueJoiner(null,b)><br></br><K2:ValueJoiner(B,b)></td>
* </tr>
* <tr>
* <td></td>
@@ -1383,7 +1384,7 @@ public interface KStream<K, V> {
* <tr>
* <td><K2:B></td>
* <td><K2:b></td>
- * <td><K2:ValueJoiner(null,b)><br /><K2:ValueJoiner(B,b)></td>
+ * <td><K2:ValueJoiner(null,b)><br></br><K2:ValueJoiner(B,b)></td>
* </tr>
* <tr>
* <td></td>
@@ -1419,6 +1420,8 @@ public interface KStream<K, V> {
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
+ * @param joined a {@link Joined} instance that defines the serdes to
+ * be used to serialize/deserialize inputs and outputs of the joined streams
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
@@ -1733,10 +1736,12 @@ public interface KStream<K, V> {
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
+ * @param table the {@link KTable} to be joined with this stream
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param joined a {@link Joined} instance that defines the serdes to
+ * be used to serialize/deserialize inputs and outputs of the joined streams
+ * @param <VT> the value type of the table
+ * @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one output for each input {@code KStream} record
* @see #join(KTable, ValueJoiner, Joined)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 3223b75..36cd17b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -562,8 +562,8 @@ public interface KTable<K, V> {
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -594,8 +594,8 @@ public interface KTable<K, V> {
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -628,8 +628,8 @@ public interface KTable<K, V> {
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
- * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
- * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
+ * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
+ * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
*
* <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index a0d6e34..15bdf92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -245,14 +245,15 @@ public class Materialized<K, V, S extends StateStore> {
* Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
* from window-start through window-end, and for the entire grace period.
*
+ * @param retention the retention time
* @return itself
* @throws IllegalArgumentException if retention is negative or can't be represented as {@code long milliseconds}
*/
public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, "retention");
- ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+ final long retenationMs = ApiUtils.validateMillisecondDuration(retention, msgPrefix);
- if (retention.toMillis() < 0) {
+ if (retenationMs < 0) {
throw new IllegalArgumentException("Retention must not be negative.");
}
this.retention = retention;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
index df9c9a1..a34fe9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.serialization.Serde;
* @param <K> the key type
* @param <V> the value type
*
- * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.Grouped)} instead
+ * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.Grouped} instead
*/
@Deprecated
public class Serialized<K, V> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index bdecd8c..9c77fa5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -74,13 +74,13 @@ public final class SessionWindows {
private final long gapMs;
private final long maintainDurationMs;
- private final Duration grace;
+ private final long graceMs;
- private SessionWindows(final long gapMs, final long maintainDurationMs, final Duration grace) {
+ private SessionWindows(final long gapMs, final long maintainDurationMs, final long graceMs) {
this.gapMs = gapMs;
this.maintainDurationMs = maintainDurationMs;
- this.grace = grace;
+ this.graceMs = graceMs;
}
/**
@@ -97,7 +97,7 @@ public final class SessionWindows {
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
- return new SessionWindows(inactivityGapMs, DEFAULT_RETENTION_MS, null);
+ return new SessionWindows(inactivityGapMs, DEFAULT_RETENTION_MS, -1);
}
/**
@@ -108,10 +108,10 @@ public final class SessionWindows {
*
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
*/
+ @SuppressWarnings("deprecation")
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
- ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix);
- return with(inactivityGap.toMillis());
+ return with(ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix));
}
/**
@@ -131,7 +131,7 @@ public final class SessionWindows {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
}
- return new SessionWindows(gapMs, durationMs, grace);
+ return new SessionWindows(gapMs, durationMs, graceMs);
}
/**
@@ -148,16 +148,16 @@ public final class SessionWindows {
*/
public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+ final long afterWindowEndMs = ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
- if (afterWindowEnd.toMillis() < 0) {
+ if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new SessionWindows(
gapMs,
maintainDurationMs,
- afterWindowEnd
+ afterWindowEndMs
);
}
@@ -167,7 +167,7 @@ public final class SessionWindows {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - gapMs) if you want to be super accurate.
- return grace != null ? grace.toMillis() : maintainMs() - inactivityGap();
+ return graceMs != -1 ? graceMs : maintainMs() - inactivityGap();
}
/**
@@ -195,17 +195,21 @@ public final class SessionWindows {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final SessionWindows that = (SessionWindows) o;
return gapMs == that.gapMs &&
maintainDurationMs == that.maintainDurationMs &&
- Objects.equals(grace, that.grace);
+ graceMs == that.graceMs;
}
@Override
public int hashCode() {
- return Objects.hash(gapMs, maintainDurationMs, grace);
+ return Objects.hash(gapMs, maintainDurationMs, graceMs);
}
@Override
@@ -213,7 +217,7 @@ public final class SessionWindows {
return "SessionWindows{" +
"gapMs=" + gapMs +
", maintainDurationMs=" + maintainDurationMs +
- ", grace=" + grace +
+ ", graceMs=" + graceMs +
'}';
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index d6f4082..d209b6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -41,7 +41,7 @@ import java.time.Duration;
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
* materialized view) that can be queried using the name provided in the {@link Materialized} instance.
*
- * New events are added to windows until their grace period ends (see {@link Windows#grace(Duration)}).
+ * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
*
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -188,7 +188,6 @@ public interface TimeWindowedKStream<K, V> {
* Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
* count (c.f. {@link #count()}).
* <p>
- * <p>
* Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
@@ -312,7 +311,8 @@ public interface TimeWindowedKStream<K, V> {
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
- * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 6b75419..72cdcb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -37,6 +37,7 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
private Serializer<T> inner;
// Default constructor needed by Kafka
+ @SuppressWarnings("WeakerAccess")
public TimeWindowedSerializer() {}
public TimeWindowedSerializer(final Serializer<T> inner) {
@@ -50,7 +51,7 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
final String value = (String) configs.get(propertyName);
try {
- inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).serializer();
+ inner = Utils.newInstance(value, Serde.class).serializer();
inner.configure(configs, isKey);
} catch (final ClassNotFoundException e) {
throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
@@ -60,8 +61,9 @@ public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
@Override
public byte[] serialize(final String topic, final Windowed<T> data) {
- if (data == null)
+ if (data == null) {
return null;
+ }
return WindowKeySchema.toBinary(data, inner, topic);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 942b54d..03203f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -60,19 +60,21 @@ public final class TimeWindows extends Windows<TimeWindow> {
private final long maintainDurationMs;
/** The size of the windows in milliseconds. */
+ @SuppressWarnings("WeakerAccess")
public final long sizeMs;
/**
* The size of the window's advance interval in milliseconds, i.e., by how much a window moves forward relative to
* the previous one.
*/
+ @SuppressWarnings("WeakerAccess")
public final long advanceMs;
- private final Duration grace;
+ private final long graceMs;
- private TimeWindows(final long sizeMs, final long advanceMs, final Duration grace, final long maintainDurationMs) {
+ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs, final long maintainDurationMs) {
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
- this.grace = grace;
+ this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
@@ -81,13 +83,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
@Deprecated
private TimeWindows(final long sizeMs,
final long advanceMs,
- final Duration grace,
+ final long graceMs,
final long maintainDurationMs,
final int segments) {
super(segments);
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
- this.grace = grace;
+ this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
@@ -110,7 +112,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
// This is a static factory method, so we initialize grace and retention to the defaults.
- return new TimeWindows(sizeMs, sizeMs, null, DEFAULT_RETENTION_MS);
+ return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
}
/**
@@ -125,10 +127,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
*/
+ @SuppressWarnings("deprecation")
public static TimeWindows of(final Duration size) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
- ApiUtils.validateMillisecondDuration(size, msgPrefix);
- return of(size.toMillis());
+ return of(ApiUtils.validateMillisecondDuration(size, msgPrefix));
}
/**
@@ -150,7 +152,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
"and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
}
- return new TimeWindows(sizeMs, advanceMs, grace, maintainDurationMs, segments);
+ return new TimeWindows(sizeMs, advanceMs, graceMs, maintainDurationMs, segments);
}
/**
@@ -167,8 +169,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
- ApiUtils.validateMillisecondDuration(advance, msgPrefix);
- return advanceBy(advance.toMillis());
+ return advanceBy(ApiUtils.validateMillisecondDuration(advance, msgPrefix));
}
@Override
@@ -201,12 +202,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
- if (afterWindowEnd.toMillis() < 0) {
+ final long afterWindowEndMs = ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+ if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
- return new TimeWindows(sizeMs, advanceMs, afterWindowEnd, maintainDurationMs, segments);
+ return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, maintainDurationMs, segments);
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@@ -215,7 +216,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - size) if you want to be super accurate.
- return grace != null ? grace.toMillis() : maintainMs() - size();
+ return graceMs != -1 ? graceMs : maintainMs() - size();
}
/**
@@ -233,7 +234,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
if (durationMs < sizeMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
}
- return new TimeWindows(sizeMs, advanceMs, grace, durationMs, segments);
+ return new TimeWindows(sizeMs, advanceMs, graceMs, durationMs, segments);
}
/**
@@ -254,20 +255,24 @@ public final class TimeWindows extends Windows<TimeWindow> {
@SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final TimeWindows that = (TimeWindows) o;
return maintainDurationMs == that.maintainDurationMs &&
segments == that.segments &&
sizeMs == that.sizeMs &&
advanceMs == that.advanceMs &&
- Objects.equals(grace, that.grace);
+ graceMs == that.graceMs;
}
@SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
@Override
public int hashCode() {
- return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, grace);
+ return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, graceMs);
}
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@@ -277,7 +282,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
"maintainDurationMs=" + maintainDurationMs +
", sizeMs=" + sizeMs +
", advanceMs=" + advanceMs +
- ", grace=" + grace +
+ ", graceMs=" + graceMs +
", segments=" + segments +
'}';
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 0a45d81..e1894ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.kstream;
-import java.time.Instant;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -47,6 +47,7 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
private static final long DEFAULT_START_TIMESTAMP_MS = 0L;
/** The start timestamp of the window. */
+ @SuppressWarnings("WeakerAccess")
public final long startMs;
private UnlimitedWindows(final long startMs) {
@@ -83,10 +84,10 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
* @return a new unlimited window that starts at {@code start}
* @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds}
*/
+ @SuppressWarnings("deprecation")
public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
- ApiUtils.validateMillisecondInstant(start, msgPrefix);
- return startOn(start.toEpochMilli());
+ return startOn(ApiUtils.validateMillisecondInstant(start, msgPrefix));
}
@Override
@@ -148,8 +149,12 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
@SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final UnlimitedWindows that = (UnlimitedWindows) o;
return startMs == that.startMs && segments == that.segments;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index ac49174..432bb45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -16,9 +16,10 @@
*/
package org.apache.kafka.streams.kstream;
-import java.time.Instant;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Instant;
+
/**
* A single window instance, defined by its start and end timestamp.
* {@code Window} is agnostic if start/end boundaries are inclusive or exclusive; this is defined by concrete
@@ -64,6 +65,8 @@ public abstract class Window {
/**
* Return the start timestamp of this window.
+ *
+ * @return The start timestamp of this window.
*/
public long start() {
return startMs;
@@ -71,6 +74,8 @@ public abstract class Window {
/**
* Return the end timestamp of this window.
+ *
+ * @return The end timestamp of this window.
*/
public long end() {
return endMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 7728317..d830f58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -65,12 +65,12 @@ public class Windowed<K> {
@Override
public boolean equals(final Object obj) {
- if (obj == this)
+ if (obj == this) {
return true;
-
- if (!(obj instanceof Windowed))
+ }
+ if (!(obj instanceof Windowed)) {
return false;
-
+ }
final Windowed<?> that = (Windowed) obj;
return window.equals(that.window) && key.equals(that.key);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
index 8544164..7cd5a3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java
@@ -20,8 +20,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
/**
* Listen to cache flush events
- * @param <K>
- * @param <V>
+ * @param <K> key type
+ * @param <V> value type
*/
public interface CacheFlushListener<K, V> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index d513f2b..c9a18de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -35,8 +35,12 @@ public class Change<T> {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final Change<?> change = (Change<?>) o;
return Objects.equals(newValue, change.newValue) &&
Objects.equals(oldValue, change.oldValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 8a76bad..7fa34b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -56,14 +56,16 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
// only one of the old / new values would be not null
if (data.newValue != null) {
- if (data.oldValue != null)
+ if (data.oldValue != null) {
throw new StreamsException("Both old and new values are not null (" + data.oldValue
- + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+ + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+ }
serializedKey = inner.serialize(topic, headers, data.newValue);
} else {
- if (data.oldValue == null)
+ if (data.oldValue == null) {
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+ }
serializedKey = inner.serialize(topic, headers, data.oldValue);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 8e91469..825db52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -28,7 +28,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
private final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier;
- public KStreamTransform(final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier) {
+ KStreamTransform(final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier) {
this.transformerSupplier = transformerSupplier;
}
@@ -41,7 +41,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
private final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer;
- public KStreamTransformProcessor(final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer) {
+ KStreamTransformProcessor(final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer) {
this.transformer = transformer;
}
@@ -55,8 +55,9 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
public void process(final K1 key, final V1 value) {
final KeyValue<? extends K2, ? extends V2> pair = transformer.transform(key, value);
- if (pair != null)
+ if (pair != null) {
context().forward(pair.key, pair.value);
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index d1e524c..1312e2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -54,8 +54,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
private V computeValue(final K key, final V value) {
V newValue = null;
- if (value != null && (filterNot ^ predicate.test(key, value)))
+ if (value != null && (filterNot ^ predicate.test(key, value))) {
newValue = value;
+ }
return newValue;
}
@@ -79,8 +80,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
final V newValue = computeValue(key, change.newValue);
final V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
- if (sendOldValues && oldValue == null && newValue == null)
+ if (sendOldValues && oldValue == null && newValue == null) {
return; // unnecessary to forward here.
+ }
if (queryableName != null) {
store.put(key, newValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 2317947..aae1437 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -74,8 +74,9 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private V1 computeValue(final K key, final V value) {
V1 newValue = null;
- if (value != null)
+ if (value != null) {
newValue = mapper.apply(key, value);
+ }
return newValue;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index 45e2513..fe961ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -25,11 +24,6 @@ public class PrintedInternal<K, V> extends Printed<K, V> {
super(printed);
}
- /**
- * Builds the {@link ProcessorSupplier} that will be used to print the records flowing through a {@link KStream}.
- *
- * @return the {@code ProcessorSupplier} to be used for printing
- */
public ProcessorSupplier<K, V> build(final String processorName) {
return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
index 40cc880..8111cdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -48,7 +48,7 @@ public final class SessionWindow extends Window {
*
* @param other another window
* @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
- * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+ * @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
*/
public boolean overlap(final Window other) throws IllegalArgumentException {
if (getClass() != other.getClass()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
index 2823cf9..ac71282 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -55,7 +55,7 @@ public class TimeWindow extends Window {
*
* @param other another window
* @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
- * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+ * @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
*/
@Override
public boolean overlap(final Window other) throws IllegalArgumentException {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index b3f8c3c..12125ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -51,7 +51,7 @@ public class UnlimitedWindow extends Window {
*
* @param other another window
* @return {@code true}
- * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+ * @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
*/
@Override
public boolean overlap(final Window other) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 161f934..90456d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -57,8 +57,12 @@ public class EagerBufferConfigImpl extends BufferConfigInternal {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index 4bab6d3..af1e0d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -48,8 +48,12 @@ public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppr
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final FinalResultsSuppressionBuilder<?> that = (FinalResultsSuppressionBuilder<?>) o;
return Objects.equals(name, that.name) &&
Objects.equals(bufferConfig, that.bufferConfig);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index ef754ec..30427b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -69,8 +69,12 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 7453475..042a81a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -71,8 +71,12 @@ public class SuppressedInternal<K> implements Suppressed<K> {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
return suppressTombstones == that.suppressTombstones &&
Objects.equals(name, that.name) &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index f91f22f..c10af9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -16,9 +16,10 @@
*/
package org.apache.kafka.streams.processor;
-import java.time.Duration;
import org.apache.kafka.common.annotation.InterfaceStability;
+import java.time.Duration;
+
/**
* A processor of key-value pair records.
*
@@ -31,7 +32,7 @@ public interface Processor<K, V> {
/**
* Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized. When the framework is done with the processor, {@link #close()} will be called on it; the
- * framework may later re-use the processor by calling {@link #init()} again.
+ * framework may later re-use the processor by calling {@code #init()} again.
* <p>
* The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
* {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} a method to be
@@ -50,9 +51,9 @@ public interface Processor<K, V> {
void process(K key, V value);
/**
- * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
+ * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
* Thus, it is not possible to write anything to Kafka as underlying clients are already closed. The framework may
- * later re-use this processor by calling {@link #init()} on it again.
+ * later re-use this processor by calling {@code #init()} on it again.
* <p>
* Note: Do not close any streams managed resources, like {@link StateStore}s here, as they are managed by the library.
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 79919a6..9d1b82c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -47,7 +47,9 @@ public class TaskId implements Comparable<TaskId> {
*/
public static TaskId parse(final String taskIdStr) {
final int index = taskIdStr.indexOf('_');
- if (index <= 0 || index + 1 >= taskIdStr.length()) throw new TaskIdFormatException(taskIdStr);
+ if (index <= 0 || index + 1 >= taskIdStr.length()) {
+ throw new TaskIdFormatException(taskIdStr);
+ }
try {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
@@ -85,8 +87,9 @@ public class TaskId implements Comparable<TaskId> {
@Override
public boolean equals(final Object o) {
- if (this == o)
+ if (this == o) {
return true;
+ }
if (o instanceof TaskId) {
final TaskId other = (TaskId) o;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e7dd4db..f3e5160 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -189,13 +189,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return task.schedule(interval, type, callback);
}
+ @SuppressWarnings("deprecation")
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
- ApiUtils.validateMillisecondDuration(interval, msgPrefix);
- return schedule(interval.toMillis(), type, callback);
+ return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
}
void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index cd4657b..7e1466e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -98,8 +98,12 @@ public class ProcessorRecordContext implements RecordContext {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final ProcessorRecordContext that = (ProcessorRecordContext) o;
return timestamp == that.timestamp &&
offset == that.offset &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
index 3cd3e90..f91bdcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
@@ -38,8 +38,9 @@ public class QuickUnion<T> {
T current = id;
T parent = ids.get(current);
- if (parent == null)
+ if (parent == null) {
throw new NoSuchElementException("id: " + id.toString());
+ }
while (!parent.equals(current)) {
// do the path splitting
@@ -53,7 +54,7 @@ public class QuickUnion<T> {
}
@SuppressWarnings("unchecked")
- public void unite(final T id1, final T... idList) {
+ void unite(final T id1, final T... idList) {
for (final T id2 : idList) {
unitePair(id1, id2);
}
@@ -63,8 +64,9 @@ public class QuickUnion<T> {
final T root1 = root(id1);
final T root2 = root(id2);
- if (!root1.equals(root2))
+ if (!root1.equals(root2)) {
ids.put(root1, root2);
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index ca8fbff..a7a8d88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -40,7 +40,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
}
- public RepartitionTopicConfig(final String name, final Map<String, String> topicConfigs) {
+ RepartitionTopicConfig(final String name, final Map<String, String> topicConfigs) {
super(name, topicConfigs);
}
@@ -64,8 +64,12 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final RepartitionTopicConfig that = (RepartitionTopicConfig) o;
return Objects.equals(name, that.name) &&
Objects.equals(topicConfigs, that.topicConfigs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index d7627fe..87505ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -67,15 +67,18 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this.context = context;
// if deserializers are null, get the default ones from the context
- if (this.keyDeserializer == null)
+ if (this.keyDeserializer == null) {
this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
- if (this.valDeserializer == null)
+ }
+ if (this.valDeserializer == null) {
this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+ }
// if value deserializers are for {@code Change} values, set the inner deserializer when necessary
if (this.valDeserializer instanceof ChangedDeserializer &&
- ((ChangedDeserializer) this.valDeserializer).inner() == null)
+ ((ChangedDeserializer) this.valDeserializer).inner() == null) {
((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
index ba90558..cbbb244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
@@ -23,7 +23,7 @@ public class Stamped<V> implements Comparable {
public final V value;
public final long timestamp;
- public Stamped(final V value, final long timestamp) {
+ Stamped(final V value, final long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
@@ -32,18 +32,21 @@ public class Stamped<V> implements Comparable {
public int compareTo(final Object other) {
final long otherTimestamp = ((Stamped<?>) other).timestamp;
- if (timestamp < otherTimestamp) return -1;
- else if (timestamp > otherTimestamp) return 1;
+ if (timestamp < otherTimestamp) {
+ return -1;
+ } else if (timestamp > otherTimestamp) {
+ return 1;
+ }
return 0;
}
@Override
public boolean equals(final Object other) {
-
- if (other == null || getClass() != other.getClass()) return false;
-
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
final long otherTimestamp = ((Stamped<?>) other).timestamp;
- return Long.compare(timestamp, otherTimestamp) == 0;
+ return timestamp == otherTimestamp;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 493f56b..2f649bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -304,10 +304,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final String host = getHost(userEndPoint);
final Integer port = getPort(userEndPoint);
- if (host == null || port == null)
+ if (host == null || port == null) {
throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
- " but received %s",
- logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+ " but received %s",
+ logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+ }
} catch (final NumberFormatException nfe) {
throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
@@ -348,9 +349,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
return new Subscription(new ArrayList<>(topics), data.encode());
}
- Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
- final String topic,
- final int errorCode) {
+ private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+ final String topic,
+ final int errorCode) {
log.error("{} is unknown yet during rebalance," +
" please make sure they have been pre-created before starting the Streams application.", topic);
final Map<String, Assignment> assignment = new HashMap<>();
@@ -598,8 +599,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
int numPartitions = UNKNOWN;
if (tasksByTopicGroup.get(topicGroupId) != null) {
for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) {
- if (numPartitions < task.partition + 1)
+ if (numPartitions < task.partition + 1) {
numPartitions = task.partition + 1;
+ }
}
final InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
topicMetadata.numPartitions = numPartitions;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
index 4606a56..e55ce71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
@@ -35,7 +35,7 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
}
- public UnwindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
+ UnwindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
super(name, topicConfigs);
}
@@ -59,8 +59,12 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final UnwindowedChangelogTopicConfig that = (UnwindowedChangelogTopicConfig) o;
return Objects.equals(name, that.name) &&
Objects.equals(topicConfigs, that.topicConfigs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
index d42642a..e177bea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
@@ -37,7 +37,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
private Long retentionMs;
- public WindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
+ WindowedChangelogTopicConfig(final String name, final Map<String, String> topicConfigs) {
super(name, topicConfigs);
}
@@ -71,8 +71,12 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final WindowedChangelogTopicConfig that = (WindowedChangelogTopicConfig) o;
return Objects.equals(name, that.name) &&
Objects.equals(topicConfigs, that.topicConfigs) &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 66e655f..6eff7bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -37,7 +37,7 @@ public class ClientState {
}
ClientState(final int capacity) {
- this(new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), capacity);
+ this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), capacity);
}
private ClientState(final Set<TaskId> activeTasks,
@@ -93,6 +93,7 @@ public class ClientState {
return prevStandbyTasks;
}
+ @SuppressWarnings("WeakerAccess")
public int assignedTaskCount() {
return assignedTasks.size();
}
@@ -101,6 +102,7 @@ public class ClientState {
capacity++;
}
+ @SuppressWarnings("WeakerAccess")
public int activeTaskCount() {
return activeTasks.size();
}
@@ -143,12 +145,13 @@ public class ClientState {
final double otherLoad = (double) other.assignedTaskCount() / other.capacity;
final double thisLoad = (double) assignedTaskCount() / capacity;
- if (thisLoad < otherLoad)
+ if (thisLoad < otherLoad) {
return true;
- else if (thisLoad > otherLoad)
+ } else if (thisLoad > otherLoad) {
return false;
- else
+ } else {
return capacity > other.capacity;
+ }
}
Set<TaskId> previousStandbyTasks() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 69c2ba2..157497d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -296,8 +296,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final Pair pair = (Pair) o;
return Objects.equals(task1, pair.task1) &&
Objects.equals(task2, pair.task2);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index dd6cc4a..20c0be9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -267,8 +267,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
}
- for (int i = 0; i < tags.length; i += 2)
+ for (int i = 0; i < tags.length; i += 2) {
tagMap.put(tags[i], tags[i + 1]);
+ }
}
tagMap.put("client-id", threadName);
return tagMap;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
index ee645b1..0c4ea12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
- * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Byte, byte[]>} instances of type <Byte, byte[]>.
*
- * For any stores implementing the {@link KeyValueStore KeyValueStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ * For any stores implementing the {@link KeyValueStore KeyValueStore<Byte, byte[]>} interface, null value bytes are considered as "not exist". This means:
*
* 1. Null value bytes in put operations should be treated as delete.
* 2. If the key does not exist, get operations should return null value bytes.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 6ba6672..3d4f55c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -43,7 +43,7 @@ public interface QueryableStoreType<T> {
*
* @param storeProvider provides access to all the underlying StateStore instances
* @param storeName The name of the Store
- * @return a read-only interface over a {@code StateStore} (cf. {@link QueryableStoreTypes.KeyValueStoreType})
+ * @return a read-only interface over a {@code StateStore} (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
*/
T create(final StateStoreProvider storeProvider, final String storeName);
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index f7a9970..d33e324 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -74,8 +74,7 @@ public class QueryableStoreTypes {
}
}
- private static class KeyValueStoreType<K, V> extends
- QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+ static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
KeyValueStoreType() {
super(ReadOnlyKeyValueStore.class);
}
@@ -88,7 +87,7 @@ public class QueryableStoreTypes {
}
- private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+ static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
WindowStoreType() {
super(ReadOnlyWindowStore.class);
}
@@ -100,7 +99,7 @@ public class QueryableStoreTypes {
}
}
- private static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+ static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
SessionStoreType() {
super(ReadOnlySessionStore.class);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index 0804338..5393fe1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -16,10 +16,11 @@
*/
package org.apache.kafka.streams.state;
-import java.time.Instant;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
+import java.time.Instant;
+
/**
* A window store that only supports read operations
* Implementations should be thread-safe as concurrent reads and writes are expected.
@@ -47,7 +48,6 @@ public interface ReadOnlyWindowStore<K, V> {
* <p>
* The time range is inclusive and applies to the starting timestamp of the window.
* For example, if we have the following windows:
- * <p>
* <pre>
* +-------------------------------+
* | key | start time | end time |
@@ -62,7 +62,7 @@ public interface ReadOnlyWindowStore<K, V> {
* +--------------------------------
* </pre>
* And we call {@code store.fetch("A", 10, 20)} then the results will contain the first
- * three windows from the table above, i.e., all those where 10 <= start time <= 20.
+ * three windows from the table above, i.e., all those where 10 <= start time <= 20.
* <p>
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
@@ -73,7 +73,7 @@ public interface ReadOnlyWindowStore<K, V> {
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
- * @deprecated Use {@link #fetch(K, Instant, Instant)} instead
+ * @deprecated Use {@link #fetch(Object, Instant, Instant)} instead
*/
@Deprecated
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@@ -85,7 +85,6 @@ public interface ReadOnlyWindowStore<K, V> {
* <p>
* The time range is inclusive and applies to the starting timestamp of the window.
* For example, if we have the following windows:
- * <p>
* <pre>
* +-------------------------------+
* | key | start time | end time |
@@ -100,14 +99,14 @@ public interface ReadOnlyWindowStore<K, V> {
* +--------------------------------
* </pre>
* And we call {@code store.fetch("A", Instant.ofEpochMilli(10), Instant.ofEpochMilli(20))} then the results will contain the first
- * three windows from the table above, i.e., all those where 10 <= start time <= 20.
+ * three windows from the table above, i.e., all those where 10 <= start time <= 20.
* <p>
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
*
* @param key the key to fetch
* @param from time range start (inclusive)
- * @param from time range end (inclusive)
+ * @param to time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 7991b0d..8236c1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -198,13 +198,13 @@ public class Stores {
final boolean retainDuplicates) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+ final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
- ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
+ final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
- final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);
+ final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
- return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval);
+ return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval);
}
private static WindowBytesStoreSupplier persistentWindowStore(final String name,
@@ -264,8 +264,7 @@ public class Stores {
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
- return persistentSessionStore(name, retentionPeriod.toMillis());
+ return persistentSessionStore(name, ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index 025f36c..a14b211 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -37,8 +37,8 @@ public class StreamsMetadata {
* operations.
*/
public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(new HostInfo("unavailable", -1),
- Collections.<String>emptySet(),
- Collections.<TopicPartition>emptySet());
+ Collections.emptySet(),
+ Collections.emptySet());
private final HostInfo hostInfo;
private final Set<String> stateStoreNames;
@@ -68,19 +68,26 @@ public class StreamsMetadata {
public String host() {
return hostInfo.host();
}
+
public int port() {
return hostInfo.port();
}
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final StreamsMetadata that = (StreamsMetadata) o;
-
- if (!hostInfo.equals(that.hostInfo)) return false;
- if (!stateStoreNames.equals(that.stateStoreNames)) return false;
+ if (!hostInfo.equals(that.hostInfo)) {
+ return false;
+ }
+ if (!stateStoreNames.equals(that.stateStoreNames)) {
+ return false;
+ }
return topicPartitions.equals(that.topicPartitions);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index c071b34..10b96e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
- * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Byte, byte[]>} instances of type <Byte, byte[]>.
*
- * For any stores implementing the {@link WindowStore WindowStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ * For any stores implementing the {@link WindowStore WindowStore<Byte, byte[]>} interface, null value bytes are considered as "not exist". This means:
*
* 1. Null value bytes in put operations should be treated as delete.
* 2. Null value bytes should never be returned in range query results.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index cf5744b..fcbd004 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -63,7 +63,6 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* <p>
* The time range is inclusive and applies to the starting timestamp of the window.
* For example, if we have the following windows:
- * <p>
* <pre>
* +-------------------------------+
* | key | start time | end time |
@@ -78,7 +77,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* +--------------------------------
* </pre>
* And we call {@code store.fetch("A", 10, 20)} then the results will contain the first
- * three windows from the table above, i.e., all those where 10 <= start time <= 20.
+ * three windows from the table above, i.e., all those where 10 <= start time <= 20.
* <p>
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
@@ -95,9 +94,10 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
@Override
default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
- ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
- ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+ return fetch(
+ key,
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
/**
@@ -118,9 +118,11 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
@Override
default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
- ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
- ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+ return fetch(
+ from,
+ to,
+ ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
+ ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
}
/**
@@ -137,8 +139,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
@Override
default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) {
- ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
- ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+ return fetchAll(
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index fa37f5e..0908085 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -16,15 +16,15 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import java.time.Instant;
import java.util.List;
import java.util.Objects;
@@ -89,11 +89,13 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
return KeyValueIterators.emptyWindowStoreIterator();
}
+ @SuppressWarnings("deprecation")
@Override
public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
- ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+ return fetch(
+ key,
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
@SuppressWarnings("deprecation")
@@ -101,12 +103,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
Objects.requireNonNull(from, "from can't be null");
Objects.requireNonNull(to, "to can't be null");
- final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
- @Override
- public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
- return store.fetch(from, to, timeFrom, timeTo);
- }
- };
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to, timeFrom, timeTo);
return new DelegatingPeekingKeyValueIterator<>(storeName,
new CompositeKeyValueIterator<>(
provider.stores(storeName, windowStoreType).iterator(),
@@ -115,44 +112,37 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
- ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+ return fetch(
+ from,
+ to,
+ ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
+ ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
}
@Override
public KeyValueIterator<Windowed<K>, V> all() {
- final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
- @Override
- public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
- return store.all();
- }
- };
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = ReadOnlyWindowStore::all;
return new DelegatingPeekingKeyValueIterator<>(storeName,
new CompositeKeyValueIterator<>(
provider.stores(storeName, windowStoreType).iterator(),
nextIteratorFunction));
}
-
+
@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
- final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
- @Override
- public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
- return store.fetchAll(timeFrom, timeTo);
- }
- };
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetchAll(timeFrom, timeTo);
return new DelegatingPeekingKeyValueIterator<>(storeName,
new CompositeKeyValueIterator<>(
provider.stores(storeName, windowStoreType).iterator(),
nextIteratorFunction));
}
+ @SuppressWarnings("deprecation")
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
- ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+ return fetchAll(
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 89935c0..870eeee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -38,14 +38,18 @@ public class ContextualRecord {
return value;
}
- public long sizeBytes() {
+ long sizeBytes() {
return (value == null ? 0 : value.length) + recordContext.sizeBytes();
}
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final ContextualRecord that = (ContextualRecord) o;
return Arrays.equals(value, that.value) &&
Objects.equals(recordContext, that.recordContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 234ea05..65f5388 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -141,8 +141,12 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final BufferKey bufferKey = (BufferKey) o;
return time == bufferKey.time &&
Objects.equals(key, bufferKey.key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 5343635..f454862 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -75,8 +75,12 @@ class LRUCacheEntry {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
final LRUCacheEntry that = (LRUCacheEntry) o;
return sizeBytes == that.sizeBytes &&
isDirty() == that.isDirty() &&
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 582eb46..80078b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -108,18 +107,15 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
// register the store
- context.register(root, new StateRestoreCallback() {
- @Override
- public void restore(final byte[] key, final byte[] value) {
- restoring = true;
- // check value for null, to avoid deserialization error.
- if (value == null) {
- delete(serdes.keyFrom(key));
- } else {
- put(serdes.keyFrom(key), serdes.valueFrom(value));
- }
- restoring = false;
+ context.register(root, (key, value) -> {
+ restoring = true;
+ // check value for null, to avoid deserialization error.
+ if (value == null) {
+ delete(serdes.keyFrom(key));
+ } else {
+ put(serdes.keyFrom(key), serdes.valueFrom(value));
}
+ restoring = false;
});
}
@@ -162,8 +158,9 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
- for (final KeyValue<K, V> entry : entries)
+ for (final KeyValue<K, V> entry : entries) {
put(entry.key, entry.value);
+ }
}
@Override
@@ -173,7 +170,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException at every invocation
*/
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
@@ -181,7 +178,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException at every invocation
*/
@Override
public KeyValueIterator<K, V> all() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index bf748fc..8c28115 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -78,7 +78,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final String parentDir;
- private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
+ private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
File dbDir;
private RocksDB db;
@@ -516,10 +516,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
if (next == null) {
return allDone();
} else {
- if (comparator.compare(next.key.get(), rawToKey) <= 0)
+ if (comparator.compare(next.key.get(), rawToKey) <= 0) {
return next;
- else
+ } else {
return allDone();
+ }
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index f95a104..f80ffa3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -54,10 +54,11 @@ class Segment extends RocksDBStore implements Comparable<Segment> {
@Override
public boolean equals(final Object obj) {
- if (obj == null || getClass() != obj.getClass()) return false;
-
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
final Segment segment = (Segment) obj;
- return Long.compare(id, segment.id) == 0;
+ return id == segment.id;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index cf858b0..d4aedce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -110,8 +110,9 @@ class Segments {
final String[] list = dir.list();
if (list != null) {
final long[] segmentIds = new long[list.length];
- for (int i = 0; i < list.length; i++)
+ for (int i = 0; i < list.length; i++) {
segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
+ }
// open segments in the id order
Arrays.sort(segmentIds);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 0c1a99d..a3baebb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.BeforeClass;
@@ -74,8 +73,9 @@ public abstract class AbstractJoinIntegrationTest {
@Parameterized.Parameters(name = "caching enabled = {0}")
public static Collection<Object[]> data() {
final List<Object[]> values = new ArrayList<>();
- for (final boolean cacheEnabled : Arrays.asList(true, false))
- values.add(new Object[] {cacheEnabled});
+ for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+ values.add(new Object[]{cacheEnabled});
+ }
return values;
}
@@ -99,29 +99,24 @@ public abstract class AbstractJoinIntegrationTest {
AtomicBoolean finalResultReached = new AtomicBoolean(false);
private final List<Input<String>> input = Arrays.asList(
- new Input<>(INPUT_TOPIC_LEFT, (String) null),
- new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+ new Input<>(INPUT_TOPIC_LEFT, null),
+ new Input<>(INPUT_TOPIC_RIGHT, null),
new Input<>(INPUT_TOPIC_LEFT, "A"),
new Input<>(INPUT_TOPIC_RIGHT, "a"),
new Input<>(INPUT_TOPIC_LEFT, "B"),
new Input<>(INPUT_TOPIC_RIGHT, "b"),
- new Input<>(INPUT_TOPIC_LEFT, (String) null),
- new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+ new Input<>(INPUT_TOPIC_LEFT, null),
+ new Input<>(INPUT_TOPIC_RIGHT, null),
new Input<>(INPUT_TOPIC_LEFT, "C"),
new Input<>(INPUT_TOPIC_RIGHT, "c"),
- new Input<>(INPUT_TOPIC_RIGHT, (String) null),
- new Input<>(INPUT_TOPIC_LEFT, (String) null),
- new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+ new Input<>(INPUT_TOPIC_RIGHT, null),
+ new Input<>(INPUT_TOPIC_LEFT, null),
+ new Input<>(INPUT_TOPIC_RIGHT, null),
new Input<>(INPUT_TOPIC_RIGHT, "d"),
new Input<>(INPUT_TOPIC_LEFT, "D")
);
- final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
- @Override
- public String apply(final String value1, final String value2) {
- return value1 + "-" + value2;
- }
- };
+ final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
final boolean cacheEnabled;
@@ -154,8 +149,9 @@ public abstract class AbstractJoinIntegrationTest {
void prepareEnvironment() throws InterruptedException {
CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
- if (!cacheEnabled)
+ if (!cacheEnabled) {
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ }
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
@@ -247,12 +243,7 @@ public abstract class AbstractJoinIntegrationTest {
producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
}
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return finalResultReached.get();
- }
- }, "Never received expected final result.");
+ TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");
checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
@@ -268,7 +259,7 @@ public abstract class AbstractJoinIntegrationTest {
* Checks the embedded queryable state store snapshot
*/
private void checkQueryableStore(final String queryableName, final String expectedFinalResult) {
- final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.<Long, String>keyValueStore());
+ final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.keyValueStore());
final KeyValueIterator<Long, String> all = store.all();
final KeyValue<Long, String> onlyEntry = all.next();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index be87eb2..2873593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -141,9 +141,8 @@ public class RegexSourceIntegrationTest {
@Test
public void testRegexMatchesTopicsAWhenCreated() throws Exception {
-
final Serde<String> stringSerde = Serdes.String();
- final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
+ final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
CLUSTER.createTopic("TEST-TOPIC-1");
@@ -167,23 +166,12 @@ public class RegexSourceIntegrationTest {
}
});
-
streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedFirstAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
CLUSTER.createTopic("TEST-TOPIC-2");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedSecondAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
}
@@ -192,7 +180,7 @@ public class RegexSourceIntegrationTest {
final Serde<String> stringSerde = Serdes.String();
final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
- final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
+ final List<String> expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B");
CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
@@ -218,21 +206,11 @@ public class RegexSourceIntegrationTest {
streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedFirstAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
CLUSTER.deleteTopic("TEST-TOPIC-A");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedSecondAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
}
@Test
@@ -252,13 +230,10 @@ public class RegexSourceIntegrationTest {
try {
streams.start();
- final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() {
- @Override
- public boolean conditionMet() {
- final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
- final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
- return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
- }
+ final TestCondition stateStoreNameBoundToSourceTopic = () -> {
+ final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
+ final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
+ return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
};
TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
@@ -296,12 +271,12 @@ public class RegexSourceIntegrationTest {
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singleton(topic1TestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singleton(topic2TestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singleton(topicATestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singleton(topicCTestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singleton(topicYTestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singleton(topicZTestMessage), producerConfig, mockTime);
final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
@@ -364,15 +339,9 @@ public class RegexSourceIntegrationTest {
}
});
-
partitionedStreamsLeader.start();
partitionedStreamsFollower.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment);
- }
- }, "topic assignment not completed");
+ TestUtils.waitForCondition(() -> followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment), "topic assignment not completed");
} finally {
if (partitionedStreamsLeader != null) {
partitionedStreamsLeader.close();
@@ -402,19 +371,17 @@ public class RegexSourceIntegrationTest {
final AtomicBoolean expectError = new AtomicBoolean(false);
streams = new KafkaStreams(builder.build(), streamsConfiguration);
- streams.setStateListener(new KafkaStreams.StateListener() {
- @Override
- public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
- if (newState == KafkaStreams.State.ERROR)
- expectError.set(true);
+ streams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.ERROR) {
+ expectError.set(true);
}
});
streams.start();
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
- IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Collections.singleton(fMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Collections.singleton(fooMessage), producerConfig, mockTime);
final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 31a349b..830514b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-import java.time.Duration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -37,13 +36,10 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -62,6 +58,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.File;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -143,21 +140,16 @@ public class RestoreIntegrationTest {
builder.table(INPUT_STREAM, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer()))
.toStream()
- .foreach(new ForeachAction<Integer, Integer>() {
- @Override
- public void apply(final Integer key, final Integer value) {
- if (numReceived.incrementAndGet() == 2 * offsetLimitDelta)
- shutdownLatch.countDown();
+ .foreach((key, value) -> {
+ if (numReceived.incrementAndGet() == 2 * offsetLimitDelta) {
+ shutdownLatch.countDown();
}
});
kafkaStreams = new KafkaStreams(builder.build(), props);
- kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
- @Override
- public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
- if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
- startupLatch.countDown();
- }
+ kafkaStreams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ startupLatch.countDown();
}
});
@@ -210,21 +202,16 @@ public class RestoreIntegrationTest {
builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store"))
.toStream()
- .foreach(new ForeachAction<Integer, Integer>() {
- @Override
- public void apply(final Integer key, final Integer value) {
- if (numReceived.incrementAndGet() == numberOfKeys)
- shutdownLatch.countDown();
+ .foreach((key, value) -> {
+ if (numReceived.incrementAndGet() == numberOfKeys) {
+ shutdownLatch.countDown();
}
});
kafkaStreams = new KafkaStreams(builder.build(), props);
- kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
- @Override
- public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
- if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
- startupLatch.countDown();
- }
+ kafkaStreams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ startupLatch.countDown();
}
});
@@ -261,21 +248,15 @@ public class RestoreIntegrationTest {
final KStream<Integer, Integer> stream = builder.stream(INPUT_STREAM);
stream.groupByKey()
- .reduce(new Reducer<Integer>() {
- @Override
- public Integer apply(final Integer value1, final Integer value2) {
- return value1 + value2;
- }
- }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
+ .reduce(
+ (value1, value2) -> value1 + value2,
+ Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
final CountDownLatch startupLatch = new CountDownLatch(1);
kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
- kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
- @Override
- public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
- if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
- startupLatch.countDown();
- }
+ kafkaStreams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ startupLatch.countDown();
}
});
@@ -310,24 +291,16 @@ public class RestoreIntegrationTest {
final KStream<Integer, Integer> stream = streamsBuilder.stream(INPUT_STREAM_2);
final CountDownLatch processorLatch = new CountDownLatch(3);
- stream.process(new ProcessorSupplier<Integer, Integer>() {
- @Override
- public Processor<Integer, Integer> get() {
- return new KeyValueStoreProcessor(INPUT_STREAM_2, processorLatch);
- }
- }, INPUT_STREAM_2);
+ stream.process(() -> new KeyValueStoreProcessor(INPUT_STREAM_2, processorLatch), INPUT_STREAM_2);
final Topology topology = streamsBuilder.build();
kafkaStreams = new KafkaStreams(topology, props(APPID + "-logging-disabled"));
final CountDownLatch latch = new CountDownLatch(1);
- kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
- @Override
- public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
- if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
- latch.countDown();
- }
+ kafkaStreams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ latch.countDown();
}
});
kafkaStreams.start();
@@ -341,12 +314,12 @@ public class RestoreIntegrationTest {
public static class KeyValueStoreProcessor implements Processor<Integer, Integer> {
- private String topic;
+ private final String topic;
private final CountDownLatch processorLatch;
private KeyValueStore<Integer, Integer> store;
- public KeyValueStoreProcessor(final String topic, final CountDownLatch processorLatch) {
+ KeyValueStoreProcessor(final String topic, final CountDownLatch processorLatch) {
this.topic = topic;
this.processorLatch = processorLatch;
}
@@ -366,9 +339,7 @@ public class RestoreIntegrationTest {
}
@Override
- public void close() {
-
- }
+ public void close() { }
}
private void createStateForRestoration(final String changelogTopic) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index badc63c..eeee3bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -55,7 +55,7 @@ public class KafkaEmbedded {
private final Properties effectiveConfig;
private final File logDir;
- public final TemporaryFolder tmpFolder;
+ private final TemporaryFolder tmpFolder;
private final KafkaServer kafka;
/**
@@ -65,6 +65,7 @@ public class KafkaEmbedded {
* broker should listen to. Note that you cannot change the `log.dirs` setting
* currently.
*/
+ @SuppressWarnings("WeakerAccess")
public KafkaEmbedded(final Properties config, final MockTime time) throws IOException {
tmpFolder = new TemporaryFolder();
tmpFolder.create();
@@ -79,16 +80,13 @@ public class KafkaEmbedded {
brokerList(), zookeeperConnect());
}
-
/**
* Creates the configuration for starting the Kafka broker by merging default values with
* overwrites.
*
* @param initialConfig Broker configuration settings that override the default config.
- * @return
- * @throws IOException
*/
- private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException {
+ private Properties effectiveConfigFrom(final Properties initialConfig) {
final Properties effectiveConfig = new Properties();
effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "localhost");
@@ -109,6 +107,7 @@ public class KafkaEmbedded {
* <p>
* You can use this to tell Kafka producers and consumers how to connect to this instance.
*/
+ @SuppressWarnings("WeakerAccess")
public String brokerList() {
final Object listenerConfig = effectiveConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
return kafka.config().hostName() + ":" + kafka.boundPort(
@@ -119,6 +118,7 @@ public class KafkaEmbedded {
/**
* The ZooKeeper connection string aka `zookeeper.connect`.
*/
+ @SuppressWarnings("WeakerAccess")
public String zookeeperConnect() {
return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
}
@@ -126,6 +126,7 @@ public class KafkaEmbedded {
/**
* Stop the broker.
*/
+ @SuppressWarnings("WeakerAccess")
public void stop() {
log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
@@ -186,6 +187,7 @@ public class KafkaEmbedded {
}
}
+ @SuppressWarnings("WeakerAccess")
public AdminClient createAdminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
@@ -198,16 +200,19 @@ public class KafkaEmbedded {
return AdminClient.create(adminClientConfig);
}
+ @SuppressWarnings("WeakerAccess")
public void deleteTopic(final String topic) {
log.debug("Deleting topic { name: {} }", topic);
try (final AdminClient adminClient = createAdminClient()) {
adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
} catch (final InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof UnknownTopicOrPartitionException))
+ if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw new RuntimeException(e);
+ }
}
}
+ @SuppressWarnings("WeakerAccess")
public KafkaServer kafkaServer() {
return kafka;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index f4ede7c..425f837 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -110,13 +110,13 @@ public class AbstractStreamTest {
new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
false);
builder.addGraphNode(this.streamsGraphNode, processorNode);
- return new KStreamImpl<K, V>(name, null, null, sourceNodes, false, processorNode, builder);
+ return new KStreamImpl<>(name, null, null, sourceNodes, false, processorNode, builder);
}
}
private class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V> {
- private Random rand;
+ private final Random rand;
ExtendedKStreamDummy() {
rand = new Random();
@@ -131,8 +131,9 @@ public class AbstractStreamTest {
@Override
public void process(final K key, final V value) {
// flip a coin and filter
- if (rand.nextBoolean())
+ if (rand.nextBoolean()) {
context().forward(key, value);
+ }
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index ec7f2fc..8607902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.perf;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.time.Duration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -27,23 +26,18 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.state.WindowStore;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -68,23 +62,26 @@ public class YahooBenchmark {
static class ProjectedEvent {
/* attributes need to be public for serializer to work */
/* main attributes */
- public String eventType;
- public String adID;
+ String eventType;
+ String adID;
/* other attributes */
- public long eventTime;
- public String userID = UUID.randomUUID().toString(); // not used
- public String pageID = UUID.randomUUID().toString(); // not used
- public String addType = "banner78"; // not used
- public String ipAddress = "1.2.3.4"; // not used
+ long eventTime;
+ /* not used
+ public String userID = UUID.randomUUID().toString();
+ public String pageID = UUID.randomUUID().toString();
+ public String addType = "banner78";
+ public String ipAddress = "1.2.3.4";
+ */
}
static class CampaignAd {
/* attributes need to be public for serializer to work */
- public String adID;
- public String campaignID;
+ String adID;
+ String campaignID;
}
+ @SuppressWarnings("WeakerAccess")
public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
this.parent = parent;
this.campaignsTopic = campaignsTopic;
@@ -188,17 +185,17 @@ public class YahooBenchmark {
/**
* Default constructor needed by Kafka
*/
- public JsonPOJOSerializer() {
- }
+ @SuppressWarnings("WeakerAccess")
+ public JsonPOJOSerializer() {}
@Override
- public void configure(final Map<String, ?> props, final boolean isKey) {
- }
+ public void configure(final Map<String, ?> props, final boolean isKey) {}
@Override
public byte[] serialize(final String topic, final T data) {
- if (data == null)
+ if (data == null) {
return null;
+ }
try {
return objectMapper.writeValueAsBytes(data);
@@ -208,22 +205,21 @@ public class YahooBenchmark {
}
@Override
- public void close() {
- }
+ public void close() {}
}
// Note: these are also in the streams example package, eventuall use 1 file
private class JsonPOJODeserializer<T> implements Deserializer<T> {
- private ObjectMapper objectMapper = new ObjectMapper();
+ private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
- public JsonPOJODeserializer() {
- }
+ @SuppressWarnings("WeakerAccess")
+ public JsonPOJODeserializer() {}
@SuppressWarnings("unchecked")
@Override
@@ -233,8 +229,9 @@ public class YahooBenchmark {
@Override
public T deserialize(final String topic, final byte[] bytes) {
- if (bytes == null)
+ if (bytes == null) {
return null;
+ }
final T data;
try {
@@ -268,50 +265,35 @@ public class YahooBenchmark {
Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
-
final KStream<String, ProjectedEvent> filteredEvents = kEvents
// use peek to quick when last element is processed
- .peek(new ForeachAction<String, ProjectedEvent>() {
- @Override
- public void apply(final String key, final ProjectedEvent value) {
- parent.processedRecords++;
- if (parent.processedRecords % 1000000 == 0) {
- System.out.println("Processed " + parent.processedRecords);
- }
- if (parent.processedRecords >= numRecords) {
- latch.countDown();
- }
+ .peek((key, value) -> {
+ parent.processedRecords++;
+ if (parent.processedRecords % 1000000 == 0) {
+ System.out.println("Processed " + parent.processedRecords);
}
- })
- // only keep "view" events
- .filter(new Predicate<String, ProjectedEvent>() {
- @Override
- public boolean test(final String key, final ProjectedEvent value) {
- return value.eventType.equals("view");
+ if (parent.processedRecords >= numRecords) {
+ latch.countDown();
}
})
+ // only keep "view" events
+ .filter((key, value) -> value.eventType.equals("view"))
// select just a few of the columns
- .mapValues(new ValueMapper<ProjectedEvent, ProjectedEvent>() {
- @Override
- public ProjectedEvent apply(final ProjectedEvent value) {
- final ProjectedEvent event = new ProjectedEvent();
- event.adID = value.adID;
- event.eventTime = value.eventTime;
- event.eventType = value.eventType;
- return event;
- }
+ .mapValues(value -> {
+ final ProjectedEvent event = new ProjectedEvent();
+ event.adID = value.adID;
+ event.eventTime = value.eventTime;
+ event.eventType = value.eventType;
+ return event;
});
// deserialize the add ID and campaign ID from the stored value in Kafka
- final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(new ValueMapper<String, CampaignAd>() {
- @Override
- public CampaignAd apply(final String value) {
- final String[] parts = value.split(":");
- final CampaignAd cAdd = new CampaignAd();
- cAdd.adID = parts[0];
- cAdd.campaignID = parts[1];
- return cAdd;
- }
+ final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(value -> {
+ final String[] parts = value.split(":");
+ final CampaignAd cAdd = new CampaignAd();
+ cAdd.adID = parts[0];
+ cAdd.campaignID = parts[1];
+ return cAdd;
});
// join the events with the campaigns
@@ -321,21 +303,15 @@ public class YahooBenchmark {
Joined.with(Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), null)
);
-
// key by campaign rather than by ad as original
final KStream<String, String> keyedByCampaign = joined
- .selectKey(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(final String key, final String value) {
- return value;
- }
- });
+ .selectKey((key, value) -> value);
// calculate windowed counts
keyedByCampaign
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
- .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windows"));
+ .count(Materialized.as("time-windows"));
return new KafkaStreams(builder.build(), streamConfig);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 14b94da..6a7bd02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -677,11 +677,13 @@ public class ProcessorTopologyTest {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
- if (record.value().toString().matches(".*@[0-9]+"))
+ if (record.value().toString().matches(".*@[0-9]+")) {
return Long.parseLong(record.value().toString().split("@")[1]);
+ }
- if (record.timestamp() >= 0L)
+ if (record.timestamp() >= 0L) {
return record.timestamp();
+ }
return DEFAULT_TIMESTAMP;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 7c5abbe..140c219 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -39,11 +38,10 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -103,8 +101,8 @@ public class StreamsPartitionAssignorTest {
"cluster",
Collections.singletonList(Node.noNode()),
infos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet());
+ Collections.emptySet(),
+ Collections.emptySet());
private final TaskId task0 = new TaskId(0, 0);
private final TaskId task1 = new TaskId(0, 1);
@@ -187,7 +185,7 @@ public class StreamsPartitionAssignorTest {
final UUID processId = UUID.randomUUID();
mockTaskManager(prevTasks, cachedTasks, processId, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
Collections.sort(subscription.topics());
@@ -219,7 +217,7 @@ public class StreamsPartitionAssignorTest {
final UUID uuid2 = UUID.randomUUID();
mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
@@ -284,8 +282,8 @@ public class StreamsPartitionAssignorTest {
"cluster",
Collections.singletonList(Node.noNode()),
localInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet());
+ Collections.emptySet(),
+ Collections.emptySet());
final List<String> topics = asList("topic1", "topic2");
@@ -301,16 +299,16 @@ public class StreamsPartitionAssignorTest {
final UUID uuid1 = UUID.randomUUID();
- mockTaskManager(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ mockTaskManager(new HashSet<>(), new HashSet<>(), uuid1, builder);
+ configurePartitionAssignor(Collections.emptyMap());
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
@@ -344,13 +342,13 @@ public class StreamsPartitionAssignorTest {
final UUID uuid1 = UUID.randomUUID();
- mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
- configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, (Object) SingleGroupPartitionGrouperStub.class));
+ mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class));
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
// will throw exception if it fails
@@ -376,13 +374,13 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
final Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
- Collections.<PartitionInfo>emptySet(),
- Collections.<String>emptySet(),
- Collections.<String>emptySet());
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet());
final UUID uuid1 = UUID.randomUUID();
mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
@@ -392,15 +390,15 @@ public class StreamsPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions);
// check assigned partitions
- assertEquals(Collections.<TopicPartition>emptySet(),
+ assertEquals(Collections.emptySet(),
new HashSet<>(assignments.get("consumer10").partitions()));
// check assignment info
- AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
+ AssignmentInfo info10 = checkAssignment(Collections.emptySet(), assignments.get("consumer10"));
final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
assertEquals(0, allActiveTasks.size());
- assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
+ assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks));
// then metadata gets populated
assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -435,18 +433,18 @@ public class StreamsPartitionAssignorTest {
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
- mockTaskManager(prevTasks10, Collections.<TaskId>emptySet(), uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ mockTaskManager(prevTasks10, Collections.emptySet(), uuid1, builder);
+ configurePartitionAssignor(Collections.emptyMap());
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), userEndPoint).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -496,21 +494,21 @@ public class StreamsPartitionAssignorTest {
final UUID uuid2 = UUID.randomUUID();
mockTaskManager(
- Collections.<TaskId>emptySet(),
- Collections.<TaskId>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
uuid1,
builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -552,8 +550,9 @@ public class StreamsPartitionAssignorTest {
if (stateChangelogTopics.contains(changelogTopic)) {
for (final TaskId id : tasks) {
- if (id.topicGroupId == entry.getKey())
+ if (id.topicGroupId == entry.getKey()) {
ids.add(id);
+ }
}
}
}
@@ -585,7 +584,7 @@ public class StreamsPartitionAssignorTest {
mockTaskManager(prevTasks00, standbyTasks01, uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
@@ -631,7 +630,7 @@ public class StreamsPartitionAssignorTest {
@Test
public void testOnAssignment() {
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
final List<TaskId> activeTaskList = asList(task0, task3);
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -677,8 +676,8 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
final UUID uuid1 = UUID.randomUUID();
- mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+ configurePartitionAssignor(Collections.emptyMap());
final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
partitionAssignor.setInternalTopicManager(internalTopicManager);
@@ -711,9 +710,9 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
final UUID uuid1 = UUID.randomUUID();
- mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
+ mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
partitionAssignor.setInternalTopicManager(internalTopicManager);
@@ -737,23 +736,13 @@ public class StreamsPartitionAssignorTest {
final KStream<Object, Object> stream1 = builder
.stream("topic1")
// force creation of internal repartition topic
- .map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
- @Override
- public KeyValue<Object, Object> apply(final Object key, final Object value) {
- return new KeyValue<>(key, value);
- }
- });
+ .map((KeyValueMapper<Object, Object, KeyValue<Object, Object>>) KeyValue::new);
// KTable with 4 partitions
final KTable<Object, Long> table1 = builder
.table("topic3")
// force creation of internal repartition topic
- .groupBy(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
- @Override
- public KeyValue<Object, Object> apply(final Object key, final Object value) {
- return new KeyValue<>(key, value);
- }
- })
+ .groupBy(KeyValue::new)
.count();
// joining the stream and the table
@@ -761,12 +750,7 @@ public class StreamsPartitionAssignorTest {
// forcing the stream.map to get repartitioned to a topic with four partitions.
stream1.join(
table1,
- new ValueJoiner() {
- @Override
- public Object apply(final Object value1, final Object value2) {
- return null;
- }
- });
+ (ValueJoiner) (value1, value2) -> null);
final UUID uuid = UUID.randomUUID();
final String client = "client1";
@@ -774,11 +758,11 @@ public class StreamsPartitionAssignorTest {
internalTopologyBuilder.setApplicationId(applicationId);
mockTaskManager(
- Collections.<TaskId>emptySet(),
- Collections.<TaskId>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
UUID.randomUUID(),
internalTopologyBuilder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
streamsConfig,
@@ -837,11 +821,11 @@ public class StreamsPartitionAssignorTest {
final UUID uuid1 = UUID.randomUUID();
mockTaskManager(
- Collections.<TaskId>emptySet(),
- Collections.<TaskId>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
uuid1,
builder);
- configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
@@ -854,12 +838,12 @@ public class StreamsPartitionAssignorTest {
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
builder.addSink("sink", "output", null, null, null, "processor");
- final List<String> topics = asList("topic1");
+ final List<String> topics = Collections.singletonList("topic1");
final UUID uuid1 = UUID.randomUUID();
- mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
- configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
+ mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
@@ -884,11 +868,11 @@ public class StreamsPartitionAssignorTest {
public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
builder.setApplicationId(applicationId);
- mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), builder);
+ mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), builder);
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
try {
- configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost"));
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost"));
fail("expected to an exception due to invalid config");
} catch (final ConfigException e) {
// pass
@@ -900,7 +884,7 @@ public class StreamsPartitionAssignorTest {
builder.setApplicationId(applicationId);
try {
- configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk"));
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:j87yhk"));
fail("expected to an exception due to invalid config");
} catch (final ConfigException e) {
// pass
@@ -911,34 +895,22 @@ public class StreamsPartitionAssignorTest {
public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
final StreamsBuilder builder = new StreamsBuilder();
-
-
final KStream<Object, Object> stream1 = builder
// Task 1 (should get created):
.stream("topic1")
// force repartitioning for aggregation
- .selectKey(new KeyValueMapper<Object, Object, Object>() {
- @Override
- public Object apply(final Object key, final Object value) {
- return null;
- }
- })
+ .selectKey((key, value) -> null)
.groupByKey()
// Task 2 (should get created):
// create repartioning and changelog topic as task 1 exists
- .count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count"))
+ .count(Materialized.as("count"))
// force repartitioning for join, but second join input topic unknown
// -> internal repartitioning topic should not get created
.toStream()
- .map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>() {
- @Override
- public KeyValue<Object, Object> apply(final Object key, final Long value) {
- return null;
- }
- });
+ .map((KeyValueMapper<Object, Long, KeyValue<Object, Object>>) (key, value) -> null);
builder
// Task 3 (should not get created because input topic unknown)
@@ -946,23 +918,13 @@ public class StreamsPartitionAssignorTest {
// force repartitioning for join, but input topic unknown
// -> thus should not create internal repartitioning topic
- .selectKey(new KeyValueMapper<Object, Object, Object>() {
- @Override
- public Object apply(final Object key, final Object value) {
- return null;
- }
- })
+ .selectKey((key, value) -> null)
// Task 4 (should not get created because input topics unknown)
// should not create any of both input repartition topics or any of both changelog topics
.join(
stream1,
- new ValueJoiner() {
- @Override
- public Object apply(final Object value1, final Object value2) {
- return null;
- }
- },
+ (ValueJoiner) (value1, value2) -> null,
JoinWindows.of(ofMillis(0))
);
@@ -973,11 +935,11 @@ public class StreamsPartitionAssignorTest {
internalTopologyBuilder.setApplicationId(applicationId);
mockTaskManager(
- Collections.<TaskId>emptySet(),
- Collections.<TaskId>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
UUID.randomUUID(),
internalTopologyBuilder);
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
streamsConfig,
@@ -1008,7 +970,7 @@ public class StreamsPartitionAssignorTest {
final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
new HostInfo("localhost", 9090), Utils.mkSet(partitionOne, partitionTwo));
- configurePartitionAssignor(Collections.<String, Object>emptyMap());
+ configurePartitionAssignor(Collections.emptyMap());
taskManager.setPartitionsByHostState(hostState);
EasyMock.expectLastCall();
@@ -1030,8 +992,8 @@ public class StreamsPartitionAssignorTest {
final UUID uuid = UUID.randomUUID();
mockTaskManager(
- Collections.<TaskId>emptySet(),
- Collections.<TaskId>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
uuid,
internalTopologyBuilder);
@@ -1183,7 +1145,7 @@ public class StreamsPartitionAssignorTest {
emptyTasks,
UUID.randomUUID(),
builder);
- configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, (Object) StreamsConfig.UPGRADE_FROM_0100));
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
@@ -1274,7 +1236,7 @@ public class StreamsPartitionAssignorTest {
equalTo(new AssignmentInfo(
new ArrayList<>(activeTasks),
standbyTaskMap,
- Collections.<HostInfo, Set<TopicPartition>>emptyMap()
+ Collections.emptyMap()
)));
assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1)));
@@ -1334,12 +1296,12 @@ public class StreamsPartitionAssignorTest {
}
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
- final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
- Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+ final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(),
+ Collections.emptyMap(),
firstHostState);
return new PartitionAssignor.Assignment(
- Collections.<TopicPartition>emptyList(), info.encode());
+ Collections.emptyList(), info.encode());
}
private AssignmentInfo checkAssignment(final Set<String> expectedTopics,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 79afb78..6df41fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -50,8 +50,7 @@ public class CompositeReadOnlyWindowStoreTest {
private CompositeReadOnlyWindowStore<String, String>
windowStore;
private ReadOnlyWindowStoreStub<String, String> underlyingWindowStore;
- private ReadOnlyWindowStoreStub<String, String>
- otherUnderlyingStore;
+ private ReadOnlyWindowStoreStub<String, String> otherUnderlyingStore;
@Rule
public final ExpectedException windowStoreIteratorException = ExpectedException.none();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 5f18be9..aad7403 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -48,7 +48,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
private final Map<Long, NavigableMap<K, V>> data = new HashMap<>();
private boolean open = true;
- public ReadOnlyWindowStoreStub(final long windowSize) {
+ ReadOnlyWindowStoreStub(final long windowSize) {
this.windowSize = windowSize;
}
@@ -80,9 +80,10 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
@Override
public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
- ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+ return fetch(
+ key,
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
@Override
@@ -103,9 +104,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
return new KeyValueIterator<Windowed<K>, V>() {
@Override
- public void close() {
-
- }
+ public void close() {}
@Override
public Windowed<K> peekNextKey() {
@@ -138,7 +137,9 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
}
final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
for (final long now : data.keySet()) {
- if (!(now >= timeFrom && now <= timeTo)) continue;
+ if (!(now >= timeFrom && now <= timeTo)) {
+ continue;
+ }
final NavigableMap<K, V> kvMap = data.get(now);
if (kvMap != null) {
for (final Entry<K, V> entry : kvMap.entrySet()) {
@@ -150,9 +151,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
return new KeyValueIterator<Windowed<K>, V>() {
@Override
- public void close() {
-
- }
+ public void close() {}
@Override
public Windowed<K> peekNextKey() {
@@ -179,9 +178,9 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
- ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+ return fetchAll(
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
@SuppressWarnings("deprecation")
@@ -203,9 +202,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
return new KeyValueIterator<Windowed<K>, V>() {
@Override
- public void close() {
-
- }
+ public void close() {}
@Override
public Windowed<K> peekNextKey() {
@@ -234,14 +231,16 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
final K to,
final Instant fromTime,
final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
- ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+ return fetch(
+ from,
+ to,
+ ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
+ ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
}
public void put(final K key, final V value, final long timestamp) {
if (!data.containsKey(timestamp)) {
- data.put(timestamp, new TreeMap<K, V>());
+ data.put(timestamp, new TreeMap<>());
}
data.get(timestamp).put(key, value);
}
@@ -252,19 +251,13 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
-
- }
+ public void init(final ProcessorContext context, final StateStore root) {}
@Override
- public void flush() {
-
- }
+ public void flush() {}
@Override
- public void close() {
-
- }
+ public void close() {}
@Override
public boolean persistent() {
@@ -276,7 +269,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
return open;
}
- public void setOpen(final boolean open) {
+ void setOpen(final boolean open) {
this.open = open;
}
@@ -289,9 +282,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
}
@Override
- public void close() {
-
- }
+ public void close() {}
@Override
public Long peekNextKey() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 7087298..078cbe4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
public class SmokeTestDriver extends SmokeTestUtil {
- public static final int MAX_RECORD_EMPTY_RETRIES = 60;
+ private static final int MAX_RECORD_EMPTY_RETRIES = 60;
private static class ValueList {
public final String key;
@@ -85,16 +85,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
final int numKeys = 20;
final int maxRecordsPerKey = 1000;
- final Thread driver = new Thread() {
- public void run() {
- try {
- final Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
- verify(kafka, allData, maxRecordsPerKey);
- } catch (final Exception ex) {
- ex.printStackTrace();
- }
+ final Thread driver = new Thread(() -> {
+ try {
+ final Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
+ verify(kafka, allData, maxRecordsPerKey);
+ } catch (final Exception ex) {
+ ex.printStackTrace();
}
- };
+ });
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@@ -601,10 +599,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
for (final Map.Entry<String, Long> entry : map.entrySet()) {
final String key = entry.getKey();
Long expectedCount = expected.remove(key);
- if (expectedCount == null)
+ if (expectedCount == null) {
expectedCount = 0L;
+ }
- if (entry.getValue() != expectedCount) {
+ if (entry.getValue().longValue() != expectedCount.longValue()) {
if (print) {
System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 4c3a6b2..7b4c58b 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -16,19 +16,18 @@
*/
package org.apache.kafka.streams.processor;
-import java.time.Duration;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -37,6 +36,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
+import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -124,7 +124,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
private final KeyValue keyValue;
private CapturedForward(final To to, final KeyValue keyValue) {
- if (keyValue == null) throw new IllegalArgumentException();
+ if (keyValue == null) {
+ throw new IllegalArgumentException();
+ }
this.childName = to.childName;
this.timestamp = to.timestamp;
@@ -396,12 +398,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
return capturedPunctuator::cancel;
}
+ @SuppressWarnings("deprecation")
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(interval, "interval");
- return schedule(interval.toMillis(), type, callback);
+ return schedule(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback);
}
/**
@@ -411,9 +413,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedPunctuator> scheduledPunctuators() {
- final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
- capturedPunctuators.addAll(punctuators);
- return capturedPunctuators;
+ return new LinkedList<>(punctuators);
}
@SuppressWarnings("unchecked")
@@ -458,11 +458,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return A list of key/value pairs that were previously passed to the context.
*/
- @SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedForward> forwarded() {
- final LinkedList<CapturedForward> result = new LinkedList<>();
- result.addAll(capturedForwards);
- return result;
+ return new LinkedList<>(capturedForwards);
}
/**
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 7fdfdbe..41b62f9 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
-import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.MockProcessorContext;
@@ -32,11 +31,13 @@ import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.junit.Test;
import java.io.File;
+import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -120,7 +121,7 @@ public class MockProcessorContextTest {
final CapturedForward forward1 = forwarded.next();
assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
- assertEquals(null, forward1.childName());
+ assertNull(forward1.childName());
final CapturedForward forward2 = forwarded.next();
assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
@@ -205,7 +206,9 @@ public class MockProcessorContextTest {
@Override
public void process(final String key, final Long value) {
- if (++count > 2) context().commit();
+ if (++count > 2) {
+ context().commit();
+ }
}
};
|