Repository: kafka Updated Branches: refs/heads/trunk 80e0af50d -> bd0146d98 MINOR: various random minor fixes and improve KafkaConsumer JavaDocs Author: Matthias J. Sax Reviewers: Damian Guy , Jason Gustafson , Guozhang Wang Closes #3884 from mjsax/minor-fixed-discoverd-via-exception-handling-investigation Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd0146d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd0146d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd0146d9 Branch: refs/heads/trunk Commit: bd0146d984dd5df82fb19aa936e8f4ff9ca40030 Parents: 80e0af5 Author: Matthias J. Sax Authored: Wed Sep 20 07:13:03 2017 +0800 Committer: Guozhang Wang Committed: Wed Sep 20 07:13:03 2017 +0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 104 ++++++++++------ .../producer/BufferExhaustedException.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 15 ++- .../apache/kafka/clients/producer/Producer.java | 33 +++--- .../kafka/common/metrics/KafkaMetric.java | 1 - .../main/scala/kafka/tools/StreamsResetter.java | 2 - .../org/apache/kafka/streams/KafkaStreams.java | 118 ++++++++++--------- .../org/apache/kafka/streams/StreamsConfig.java | 8 +- .../internals/AbstractProcessorContext.java | 15 +-- .../processor/internals/AbstractTask.java | 1 + .../processor/internals/AssignedTasks.java | 8 +- .../internals/InternalTopologyBuilder.java | 2 +- .../processor/internals/RecordQueue.java | 28 ++--- .../internals/SourceNodeRecordDeserializer.java | 4 +- .../processor/internals/StandbyTask.java | 37 +++--- .../internals/StoreChangelogReader.java | 9 +- .../streams/processor/internals/StreamTask.java | 21 ++-- .../processor/internals/StreamThread.java | 26 ++-- .../processor/internals/TaskManager.java | 20 +--- 19 files changed, 251 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 76e4073..c2f2f5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -112,7 +112,7 @@ import java.util.regex.Pattern; * Kafka uses the concept of consumer groups to allow a pool of processes to divide the work of consuming and * processing records. These processes can either be running on the same machine or they can be * distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances - * sharing the same group.id will be part of the same consumer group. + * sharing the same {@code group.id} will be part of the same consumer group. *

* Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the * {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the @@ -152,12 +152,12 @@ import java.util.regex.Pattern; * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for - * a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will + * a duration of {@code session.timeout.ms}, then the consumer will be considered dead and its partitions will * be reassigned. *

* It is also possible that the consumer could encounter a "livelock" situation where it is continuing * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions - * indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms + * indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms} * setting. Basically if you don't call poll at least as frequently as the configured max interval, * then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, * you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}). @@ -211,15 +211,15 @@ import java.util.regex.Pattern; * * * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the - * configuration bootstrap.servers. This list is just used to discover the rest of the brokers in the + * configuration {@code >bootstrap.servers}. This list is just used to discover the rest of the brokers in the * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in * case there are servers down when the client is connecting). *

- * Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by - * the config auto.commit.interval.ms. + * Setting {@code enable.auto.commit} means that offsets are committed automatically with a frequency controlled by + * the config {@code auto.commit.interval.ms}. *

* In this example the consumer is subscribing to the topics foo and bar as part of a group of consumers - * called test as configured with group.id. + * called test as configured with {@code group.id}. *

* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we * are saying that our record's key and value will just be simple strings. @@ -423,8 +423,7 @@ import java.util.regex.Pattern; *

* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. * In order for this to work, consumers reading from these partitions should be configured to only read committed data. - * This can be achieved by by setting the isolation.level=read_committed in the consumer's configuration. - *

+ * This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration. * *

* In read_committed mode, the consumer will read only those transactional messages which have been @@ -433,17 +432,19 @@ import java.util.regex.Pattern; * consumer would be the offset of the first message in the partition belonging to an open transaction. This offset * is known as the 'Last Stable Offset'(LSO).

* - *

A

read_committed consumer will only read up till the LSO and filter out any transactional + *

+ * A {@code read_committed} consumer will only read up to the LSO and filter out any transactional * messages which have been aborted. The LSO also affects the behavior of {@link #seekToEnd(Collection)} and - * {@link #endOffsets(Collection)} for read_committed consumers, details of which are in each method's documentation. - * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for read_committed consumers.

+ * {@link #endOffsets(Collection)} for {@code read_committed} consumers, details of which are in each method's documentation. + * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for {@code read_committed} consumers. * - *

Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. + *

+ * Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. * There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from * topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction * markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using - * read_committed consumers may also see gaps due to aborted transactions, since those messages would not - * be returned by the consumer and yet would have valid offsets.

+ * {@code read_committed} consumers may also see gaps due to aborted transactions, since those messages would not + * be returned by the consumer and yet would have valid offsets. * *

Multi-threaded Processing

* @@ -869,7 +870,9 @@ public class KafkaConsumer implements Consumer { * @param topics The list of topics to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics - * @throws IllegalArgumentException If topics is null or contains null or empty elements + * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null + * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Collection topics, ConsumerRebalanceListener listener) { @@ -911,6 +914,8 @@ public class KafkaConsumer implements Consumer { * * @param topics The list of topics to subscribe to * @throws IllegalArgumentException If topics is null or contains null or empty elements + * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Collection topics) { @@ -918,9 +923,8 @@ public class KafkaConsumer implements Consumer { } /** - * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics - * existing at the time of check. - * + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topic existing at the time of check. *

* As part of group management, the consumer will keep track of the list of consumers that * belong to a particular group and will trigger a rebalance operation if one of the @@ -935,7 +939,9 @@ public class KafkaConsumer implements Consumer { * @param pattern Pattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics - * @throws IllegalArgumentException If pattern is null + * @throws IllegalArgumentException If pattern or listener is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { @@ -956,7 +962,6 @@ public class KafkaConsumer implements Consumer { /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. - * *

* This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer @@ -966,6 +971,8 @@ public class KafkaConsumer implements Consumer { * * @param pattern Pattern to subscribe to * @throws IllegalArgumentException If pattern is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Pattern pattern) { @@ -973,8 +980,8 @@ public class KafkaConsumer implements Consumer { } /** - * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This - * also clears any partitions directly assigned through {@link #assign(Collection)}. + * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. + * This also clears any partitions directly assigned through {@link #assign(Collection)}. */ public void unsubscribe() { acquireAndEnsureOpen(); @@ -991,17 +998,21 @@ public class KafkaConsumer implements Consumer { /** * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment * and will replace the previous assignment (if there is one). - * + *

* If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}. - * *

* Manual topic assignment through this method does not use the consumer's group management * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)} * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}. + *

+ * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new + * assignment replaces the old one. * * @param partitions The list of partitions to assign this consumer * @throws IllegalArgumentException If partitions is null or contains null or empty topics + * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern + * (without a subsequent call to {@link #unsubscribe()}) */ @Override public void assign(Collection partitions) { @@ -1289,6 +1300,9 @@ public class KafkaConsumer implements Consumer { * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets + * + * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer + * or if provided offset is negative */ @Override public void seek(TopicPartition partition, long offset) { @@ -1307,11 +1321,16 @@ public class KafkaConsumer implements Consumer { /** * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called. - * If no partition is provided, seek to the first offset for all of the currently assigned partitions. + * If no partitions are provided, seek to the first offset for all of the currently assigned partitions. + * + * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToBeginning(Collection partitions) { acquireAndEnsureOpen(); try { + if (partitions == null) { + throw new IllegalArgumentException("Partitions collection cannot be null"); + } Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to beginning of partition {}", tp); @@ -1325,14 +1344,19 @@ public class KafkaConsumer implements Consumer { /** * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called. - * If no partition is provided, seek to the final offset for all of the currently assigned partitions. - * - * If isolation.level=read_committed, the end offset will be the Last Stable Offset, ie. the offset + * If no partitions are provided, seek to the final offset for all of the currently assigned partitions. + *

+ * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset * of the first message with an open transaction. + * + * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToEnd(Collection partitions) { acquireAndEnsureOpen(); try { + if (partitions == null) { + throw new IllegalArgumentException("Partitions collection cannot be null"); + } Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to end of partition {}", tp); @@ -1348,6 +1372,7 @@ public class KafkaConsumer implements Consumer { * * @param partition The partition to get the position for * @return The offset + * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this @@ -1471,6 +1496,7 @@ public class KafkaConsumer implements Consumer { * Note that this method does not affect partition subscription. In particular, it does not cause a group * rebalance when automatic assignment is used. * @param partitions The partitions which should be paused + * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer */ @Override public void pause(Collection partitions) { @@ -1490,6 +1516,7 @@ public class KafkaConsumer implements Consumer { * {@link #poll(long)} will return records from these partitions if there are any to be fetched. * If the partitions were not previously paused, this method is a no-op. * @param partitions The partitions which should be resumed + * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer */ @Override public void resume(Collection partitions) { @@ -1534,6 +1561,8 @@ public class KafkaConsumer implements Consumer { * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the configured request timeout * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up * the offsets by timestamp. */ @@ -1564,6 +1593,8 @@ public class KafkaConsumer implements Consumer { * * @param partitions the partitions to get the earliest offsets. * @return The earliest available offsets for the given partitions + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the configured request timeout */ @Override public Map beginningOffsets(Collection partitions) { @@ -1581,16 +1612,17 @@ public class KafkaConsumer implements Consumer { *

* Notice that this method may block indefinitely if the partition does not exist. * This method does not change the current consumer position of the partitions. - *

- * - *

When isolation.level=read_committed the last offset will be the Last Stable Offset (LSO). + *

+ * When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO). * This is the offset of the first message with an open transaction. The LSO moves forward as transactions - * are completed.

+ * are completed. * * @see #seekToEnd(Collection) * * @param partitions the partitions to get the end offsets. * @return The end offsets for the given partitions. + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the configured request timeout */ @Override public Map endOffsets(Collection partitions) { @@ -1619,7 +1651,7 @@ public class KafkaConsumer implements Consumer { /** * Tries to close the consumer cleanly within the specified timeout. This method waits up to - * timeout for the consumer to complete pending commits and leave the group. + * {@code timeout} for the consumer to complete pending commits and leave the group. * If auto-commit is enabled, this will commit the current offsets if possible within the * timeout. If the consumer is unable to complete offset commits and gracefully leave the group * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be @@ -1627,9 +1659,9 @@ public class KafkaConsumer implements Consumer { * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * @param timeUnit The time unit for the timeout + * @param timeUnit The time unit for the {@code timeout} * @throws InterruptException If the thread is interrupted before or while this function is called - * @throws IllegalArgumentException If the timeout is negative. + * @throws IllegalArgumentException If the {@code timeout} is negative. */ public void close(long timeout, TimeUnit timeUnit) { if (timeout < 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java index 929b6b9..be840db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.KafkaException; /** * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at - * which data can be sent for long enough for the alloted buffer to be exhausted. + * which data can be sent for long enough for the allocated buffer to be exhausted. */ public class BufferExhaustedException extends KafkaException { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7d51640..7dcec5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -20,7 +20,9 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetrics; import org.apache.kafka.clients.producer.internals.RecordAccumulator; @@ -564,12 +566,17 @@ public class KafkaProducer implements Producer { } /** - * Sends a list of consumed offsets to the consumer group coordinator, and also marks + * Sends a list of specified offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. These offsets will be considered - * consumed only if the transaction is committed successfully. - * + * committed only if the transaction is committed successfully. The committed offset should + * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1. + *

* This method should be used when you need to batch consumed and produced messages - * together, typically in a consume-transform-produce pattern. + * together, typically in a consume-transform-produce pattern. Thus, the specified + * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used + * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false} + * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or + * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits). * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 1e77633..4982033 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -63,43 +63,38 @@ public interface Producer extends Closeable { void abortTransaction() throws ProducerFencedException; /** - * Send the given record asynchronously and return a future which will eventually contain the response information. - * - * @param record The record to send - * @return A future which will eventually contain the response information + * See {@link KafkaProducer#send(ProducerRecord)} */ - public Future send(ProducerRecord record); + Future send(ProducerRecord record); /** - * Send a record and invoke the given callback when the record has been acknowledged by the server + * See {@link KafkaProducer#send(ProducerRecord, Callback)} */ - public Future send(ProducerRecord record, Callback callback); + Future send(ProducerRecord record, Callback callback); /** - * Flush any accumulated records from the producer. Blocks until all sends are complete. + * See {@link KafkaProducer#flush()} */ - public void flush(); + void flush(); /** - * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change - * over time so this list should not be cached. + * See {@link KafkaProducer#partitionsFor(String)} */ - public List partitionsFor(String topic); + List partitionsFor(String topic); /** - * Return a map of metrics maintained by the producer + * See {@link KafkaProducer#metrics()} */ - public Map metrics(); + Map metrics(); /** - * Close this producer + * See {@link KafkaProducer#close()} */ - public void close(); + void close(); /** - * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the - * timeout, fail any pending send requests and force close the producer. + * See {@link KafkaProducer#close(long, TimeUnit)} */ - public void close(long timeout, TimeUnit unit); + void close(long timeout, TimeUnit unit); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 1cd5b24..ef53b89 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -29,7 +29,6 @@ public final class KafkaMetric implements Metric { private MetricConfig config; KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) { - super(); this.metricName = metricName; this.lock = lock; this.measurable = measurable; http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 7ee5424..9cf0e5c 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -22,11 +22,9 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; - import kafka.admin.AdminClient; import kafka.admin.TopicCommand; import kafka.utils.ZkUtils; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b31a3e3..5aec3c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -52,7 +54,6 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.StreamsMetadata; -import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider; import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.StateStoreProvider; @@ -328,6 +329,7 @@ public class KafkaStreams { * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes. * * @param listener a new state listener + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setStateListener(final KafkaStreams.StateListener listener) { if (state == State.CREATED) { @@ -342,6 +344,7 @@ public class KafkaStreams { * terminates due to an uncaught exception. * * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { if (state == State.CREATED) { @@ -362,6 +365,7 @@ public class KafkaStreams { * processing. * * @param globalStateRestoreListener The listener triggered when {@link StateStore} is being restored. + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) { if (state == State.CREATED) { @@ -491,6 +495,7 @@ public class KafkaStreams { * * @param topology the topology specifying the computational logic * @param props properties for {@link StreamsConfig} + * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, final Properties props) { @@ -502,6 +507,7 @@ public class KafkaStreams { * * @param topology the topology specifying the computational logic * @param config the Kafka Streams configuration + * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, final StreamsConfig config) { @@ -515,6 +521,7 @@ public class KafkaStreams { * @param config the Kafka Streams configuration * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients * for the new {@code KafkaStreams} instance + * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, final StreamsConfig config, @@ -524,69 +531,33 @@ public class KafkaStreams { private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final StreamsConfig config, - final KafkaClientSupplier clientSupplier) { - // create the metrics - final Time time = Time.SYSTEM; - - processId = UUID.randomUUID(); - + final KafkaClientSupplier clientSupplier) throws StreamsException { this.config = config; // The application ID is a required config and hence should always have value final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - - internalTopologyBuilder.setApplicationId(applicationId); + processId = UUID.randomUUID(); String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) + if (clientId.length() <= 0) { clientId = applicationId + "-" + processId; + } - this.logPrefix = String.format("stream-client [%s] ", clientId); - + this.logPrefix = String.format("stream-client [%s]", clientId); final LogContext logContext = new LogContext(logPrefix); - this.log = logContext.logger(getClass()); + final String cleanupThreadName = clientId + "-CleanupThread"; - final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(JMX_PREFIX)); - - final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) - .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); - - metrics = new Metrics(metricConfig, reporters, time); - - threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; - final Map threadState = new HashMap<>(threads.length); - GlobalStreamThread.State globalThreadState = null; - - final ArrayList storeProviders = new ArrayList<>(); - streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); - - final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + internalTopologyBuilder.setApplicationId(applicationId); + // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception + internalTopologyBuilder.build(null); - if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { + long cacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + if (cacheSize < 0) { + cacheSize = 0; log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); } - final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / - (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1))); - - stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); - if (globalTaskTopology != null) { - final String globalThreadId = clientId + "-GlobalStreamThread"; - globalStreamThread = new GlobalStreamThread(globalTaskTopology, - config, - clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), - stateDirectory, - metrics, - time, - globalThreadId); - globalThreadState = globalStreamThread.state(); - } - final StateRestoreListener delegatingStateRestoreListener = new StateRestoreListener() { @Override public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { @@ -610,6 +581,44 @@ public class KafkaStreams { } }; + threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; + try { + stateDirectory = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + Time.SYSTEM); + } catch (final ProcessorStateException fatal) { + throw new StreamsException(fatal); + } + streamsMetadataState = new StreamsMetadataState( + internalTopologyBuilder, + parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); + + final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(JMX_PREFIX)); + metrics = new Metrics(metricConfig, reporters, Time.SYSTEM); + + GlobalStreamThread.State globalThreadState = null; + final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + if (globalTaskTopology != null) { + final String globalThreadId = clientId + "-GlobalStreamThread"; + globalStreamThread = new GlobalStreamThread(globalTaskTopology, + config, + clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), + stateDirectory, + metrics, + Time.SYSTEM, + globalThreadId); + globalThreadState = globalStreamThread.state(); + } + + final Map threadState = new HashMap<>(threads.length); + final ArrayList storeProviders = new ArrayList<>(); for (int i = 0; i < threads.length; i++) { threads[i] = StreamThread.create(internalTopologyBuilder, config, @@ -617,14 +626,15 @@ public class KafkaStreams { processId, clientId, metrics, - time, + Time.SYSTEM, streamsMetadataState, - cacheSizeBytes, + cacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)), stateDirectory, delegatingStateRestoreListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } + final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); if (globalTaskTopology != null) { globalStreamThread.setStateListener(streamStateListener); @@ -635,7 +645,6 @@ public class KafkaStreams { final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores()); queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); - final String cleanupThreadName = clientId + "-CleanupThread"; stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(final Runnable r) { @@ -690,7 +699,8 @@ public class KafkaStreams { * {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}. * @throws IllegalStateException if process was already started - * @throws StreamsException if the Kafka brokers have version 0.10.0.x + * @throws StreamsException if the Kafka brokers have version 0.10.0.x or + * if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers */ public synchronized void start() throws IllegalStateException, StreamsException { log.debug("Starting Streams client"); @@ -858,7 +868,7 @@ public class KafkaStreams { *

* Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}. * - * @throws IllegalStateException if the instance is currently running + * @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running} */ public void cleanUp() { if (isRunning()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 446f941..cde416d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -342,10 +342,10 @@ public class StreamsConfig extends AbstractConfig { Importance.MEDIUM, CLIENT_ID_DOC) .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, - Type.CLASS, - LogAndFailExceptionHandler.class.getName(), - Importance.MEDIUM, - DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + Type.CLASS, + LogAndFailExceptionHandler.class.getName(), + Importance.MEDIUM, + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_KEY_SERDE_CLASS_CONFIG, Type.CLASS, Serdes.ByteArraySerde.class.getName(), http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 3c8e077..9e853fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -46,12 +46,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte final StateManager stateManager; public AbstractProcessorContext(final TaskId taskId, - final String applicationId, - final StreamsConfig config, - final StreamsMetrics metrics, - final StateManager stateManager, - final ThreadCache cache) { - + final String applicationId, + final StreamsConfig config, + final StreamsMetrics metrics, + final StateManager stateManager, + final ThreadCache cache) { this.taskId = taskId; this.applicationId = applicationId; this.config = config; @@ -93,7 +92,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte } @Override - public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) { + public void register(final StateStore store, + final boolean loggingEnabled, + final StateRestoreCallback stateRestoreCallback) { if (initialized) { throw new IllegalStateException("Can only create state stores during initialization."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 5ed9aae..6734da6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -226,6 +226,7 @@ public abstract class AbstractTask implements Task { * @throws ProcessorStateException if there is an error while closing the state manager * @param writeCheckpoint boolean indicating if a checkpoint file should be written */ + // visible for testing void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { ProcessorStateException exception = null; log.trace("Closing state manager"); http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 2d886b7..3208f93 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -137,7 +137,7 @@ class AssignedTasks { restoredPartitions.addAll(restored); for (final Iterator> it = restoring.entrySet().iterator(); it.hasNext(); ) { final Map.Entry entry = it.next(); - Task task = entry.getValue(); + final Task task = entry.getValue(); if (restoredPartitions.containsAll(task.changelogPartitions())) { transitionToRunning(task); resume.addAll(task.partitions()); @@ -303,11 +303,12 @@ class AssignedTasks { builder.append("\n"); } - private List allInitializedTasks() { + private List allTasks() { final List tasks = new ArrayList<>(); tasks.addAll(running.values()); tasks.addAll(suspended.values()); tasks.addAll(restoring.values()); + tasks.addAll(created.values()); return tasks; } @@ -428,8 +429,7 @@ class AssignedTasks { } void close(final boolean clean) { - close(allInitializedTasks(), clean); - close(created.values(), clean); + close(allTasks(), clean); clear(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index d47af88..81d2f6c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1744,7 +1744,7 @@ public class InternalTopologyBuilder { private String subtopologiesAsString() { final StringBuilder sb = new StringBuilder(); - sb.append("Sub-topologies: \n"); + sb.append("Sub-topologies:\n"); if (subtopologies.isEmpty()) { sb.append(" none\n"); } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index d26511c..889b6d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -42,7 +42,6 @@ public class RecordQueue { private final ArrayDeque fifoQueue; private final TimestampTracker> timeTracker; private final SourceNodeRecordDeserializer recordDeserializer; - private final DeserializationExceptionHandler deserializationExceptionHandler; private final ProcessorContext processorContext; private long partitionTime = TimestampTracker.NOT_KNOWN; @@ -58,11 +57,9 @@ public class RecordQueue { this.fifoQueue = new ArrayDeque<>(); this.timeTracker = new MinTimestampTracker<>(); this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler); - this.deserializationExceptionHandler = deserializationExceptionHandler; this.processorContext = processorContext; } - /** * Returns the corresponding source node in the topology * @@ -87,15 +84,15 @@ public class RecordQueue { * @param rawRecords the raw records * @return the size of this queue */ - public int addRawRecords(Iterable> rawRecords) { - for (ConsumerRecord rawRecord : rawRecords) { + int addRawRecords(final Iterable> rawRecords) { + for (final ConsumerRecord rawRecord : rawRecords) { - ConsumerRecord record = recordDeserializer.tryDeserialize(processorContext, rawRecord); + final ConsumerRecord record = recordDeserializer.tryDeserialize(processorContext, rawRecord); if (record == null) { continue; } - long timestamp = timestampExtractor.extract(record, timeTracker.get()); + final long timestamp = timestampExtractor.extract(record, timeTracker.get()); log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); // drop message if TS is invalid, i.e., negative @@ -103,7 +100,7 @@ public class RecordQueue { continue; } - StampedRecord stampedRecord = new StampedRecord(record, timestamp); + final StampedRecord stampedRecord = new StampedRecord(record, timestamp); fifoQueue.addLast(stampedRecord); timeTracker.addElement(stampedRecord); } @@ -111,10 +108,11 @@ public class RecordQueue { // update the partition timestamp if its currently // tracked min timestamp has exceed its value; this will // usually only take effect for the first added batch - long timestamp = timeTracker.get(); + final long timestamp = timeTracker.get(); - if (timestamp > partitionTime) + if (timestamp > partitionTime) { partitionTime = timestamp; + } return size(); } @@ -125,19 +123,21 @@ public class RecordQueue { * @return StampedRecord */ public StampedRecord poll() { - StampedRecord elem = fifoQueue.pollFirst(); + final StampedRecord elem = fifoQueue.pollFirst(); - if (elem == null) + if (elem == null) { return null; + } timeTracker.removeElement(elem); // only advance the partition timestamp if its currently // tracked min timestamp has exceeded its value - long timestamp = timeTracker.get(); + final long timestamp = timeTracker.get(); - if (timestamp > partitionTime) + if (timestamp > partitionTime) { partitionTime = timestamp; + } return elem; } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java index 1d9e722..7fde881 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java @@ -63,12 +63,12 @@ class SourceNodeRecordDeserializer implements RecordDeserializer { } public ConsumerRecord tryDeserialize(final ProcessorContext processorContext, - ConsumerRecord rawRecord) { + final ConsumerRecord rawRecord) { // catch and process if we have a deserialization handler try { return deserialize(rawRecord); - } catch (Exception e) { + } catch (final Exception e) { final DeserializationExceptionHandler.DeserializationHandlerResponse response = deserializationExceptionHandler.handle(processorContext, rawRecord, e); if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 033af24..98ec810 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -63,6 +63,15 @@ public class StandbyTask extends AbstractTask { processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); } + @Override + public boolean initialize() { + initializeStateStores(); + checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); + processorContext.initialized(); + taskInitialized = true; + return true; + } + /** *

      * - update offset limits
@@ -139,16 +148,6 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean maybePunctuateStreamTime() {
-        throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
-    }
-
-    @Override
-    public boolean maybePunctuateSystemTime() {
-        throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
-    }
-
-    @Override
     public boolean commitNeeded() {
         return false;
     }
@@ -174,16 +173,18 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean process() {
-        throw new UnsupportedOperationException("process not supported by StandbyTasks");
+    public boolean maybePunctuateStreamTime() {
+        throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
     }
 
-    public boolean initialize() {
-        initializeStateStores();
-        checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
-        processorContext.initialized();
-        taskInitialized = true;
-        return true;
+    @Override
+    public boolean maybePunctuateSystemTime() {
+        throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
+    }
+
+    @Override
+    public boolean process() {
+        throw new UnsupportedOperationException("process not supported by StandbyTasks");
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 8ecc7e2..caa0100 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -185,7 +185,9 @@ public class StoreChangelogReader implements ChangelogReader {
         needsRestoring.putAll(initialized);
     }
 
-    private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) {
+    private void logRestoreOffsets(final TopicPartition partition,
+                                   final long startingOffset,
+                                   final Long endOffset) {
         log.debug("Restoring partition {} from offset {} to endOffset {}",
                   partition,
                   startingOffset,
@@ -229,7 +231,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void restorePartition(final ConsumerRecords allRecords,
-                                    final TopicPartition topicPartition) {
+                                  final TopicPartition topicPartition) {
         final StateRestorer restorer = stateRestorers.get(topicPartition);
         final Long endOffset = endOffsets.get(topicPartition);
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
@@ -255,7 +257,8 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private long processNext(final List> records,
-                             final StateRestorer restorer, final Long endOffset) {
+                             final StateRestorer restorer,
+                             final Long endOffset) {
         final List> restoreRecords = new ArrayList<>();
         long nextPosition = -1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 084a991..0830aa2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -131,7 +131,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
 
-        final TimestampExtractor defaultTimestampExtractor  = config.defaultTimestampExtractor();
+        final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
         final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
         for (final TopicPartition partition : partitions) {
             final SourceNode source = topology.source(partition.topic());
@@ -151,6 +151,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+    public boolean initialize() {
+        log.debug("Initializing");
+        initializeStateStores();
+        initTopology();
+        processorContext.initialized();
+        taskInitialized = true;
+        return topology.stateStores().isEmpty();
+    }
+
+
     /**
      * 
      * - re-initialize the task
@@ -597,13 +607,4 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         return new RecordCollectorImpl(producer, id.toString(), logContext);
     }
 
-    public boolean initialize() {
-        log.debug("Initializing");
-        initializeStateStores();
-        initTopology();
-        processorContext.initialized();
-        taskInitialized = true;
-        return topology.stateStores().isEmpty();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 867359b..e141c46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -421,7 +421,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             }
 
             return threadProducer;
-
         }
 
         @Override
@@ -456,20 +455,29 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
 
         @Override
-        StandbyTask createTask(final Consumer consumer, final TaskId taskId, final Set partitions) {
+        StandbyTask createTask(final Consumer consumer,
+                               final TaskId taskId,
+                               final Set partitions) {
             taskCreatedSensor.record();
 
             final ProcessorTopology topology = builder.build(taskId.topicGroupId);
 
             if (!topology.stateStores().isEmpty()) {
-                return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
+                return new StandbyTask(taskId,
+                                       applicationId,
+                                       partitions,
+                                       topology,
+                                       consumer,
+                                       storeChangelogReader,
+                                       config,
+                                       streamsMetrics,
+                                       stateDirectory);
             } else {
-                log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, partitions);
-
+                log.trace("Skipped standby task {} with assigned partitions {} " +
+                    "since it does not have any state stores to materialize", taskId, partitions);
                 return null;
             }
         }
-
     }
 
     /**
@@ -781,8 +789,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 final long processLatency = computeLatency();
                 streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
                                                         timerStartedMs);
-                processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
-                                                                                  processLatency, commitTimeMs);
+                processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit,
+                                                                           totalProcessed,
+                                                                           processLatency,
+                                                                           commitTimeMs);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7afbecf..f12ed91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -110,7 +110,7 @@ class TaskManager {
                         newTasks.put(taskId, partitions);
                     }
                 } catch (final StreamsException e) {
-                    log.error("Failed to create an active task {} due to the following error:", taskId, e);
+                    log.error("Failed to resume an active task {} due to the following error:", taskId, e);
                     throw e;
                 }
             } else {
@@ -122,6 +122,7 @@ class TaskManager {
             return;
         }
 
+        // CANNOT FIND RETRY AND BACKOFF LOGIC
         // create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
         log.trace("New active tasks to be created: {}", newTasks);
@@ -185,24 +186,13 @@ class TaskManager {
         firstException.compareAndSet(null, active.suspend());
         firstException.compareAndSet(null, standby.suspend());
         // remove the changelog partitions from restore consumer
-        firstException.compareAndSet(null, unAssignChangeLogPartitions());
+        restoreConsumer.assign(Collections.emptyList());
 
         if (firstException.get() != null) {
             throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get());
         }
     }
 
-    private RuntimeException unAssignChangeLogPartitions() {
-        try {
-            // un-assign the change log partitions
-            restoreConsumer.assign(Collections.emptyList());
-        } catch (final RuntimeException e) {
-            log.error("Failed to un-assign change log partitions due to the following error:", e);
-            return e;
-        }
-        return null;
-    }
-
     void shutdown(final boolean clean) {
         log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
                   active.previousTaskIds(), standby.previousTaskIds());
@@ -215,9 +205,9 @@ class TaskManager {
             log.error("Failed to close KafkaStreamClient due to the following error:", e);
         }
         // remove the changelog partitions from restore consumer
-        unAssignChangeLogPartitions();
+        restoreConsumer.assign(Collections.emptyList());
         taskCreator.close();
-
+        standbyTaskCreator.close();
     }
 
     Set suspendedActiveTaskIds() {