kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4829: Improve log4j on Streams thread / task-level
Date Sat, 24 Jun 2017 01:03:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 8abac1984 -> 59b57212a


KAFKA-4829: Improve log4j on Streams thread / task-level

These are the following improvements I made:

1. On stream thread level, INFO will be demonstrating `Completed xx tasks in yy ms` or `Completed
rebalance with xx state in yy ms`,
2. On Stream thread cache level, INFO on `Flushed xx records`.
3. On Stream thread level, DEBUG on internal batched operations like `created xx tasks`, and
TRACE on individual operation like `created x task`.
4. Also using `isTraceEnabled` on the critical path to reduce overhead of creating `Object[]`.
5. Minor cleanups in the code.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Steven Schlansker, Nicolas Fouché, Kamal C, Ismael Juma, Bill Bejeck, Eno Thereska,
Matthias J. Sax, Damian Guy

Closes #3354 from guozhangwang/K4829-tasks-log4j

(cherry picked from commit d5e463b9de1f92b0dcc8fea4cf74a0eaca835ef7)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/59b57212
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/59b57212
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/59b57212

Branch: refs/heads/0.11.0
Commit: 59b57212ad2ccee27b27a259976f2288e4227fdd
Parents: 8abac19
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Jun 23 18:01:41 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jun 23 18:03:22 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       | 12 ++-
 .../processor/internals/ProcessorNode.java      |  3 -
 .../internals/ProcessorStateManager.java        |  4 +-
 .../processor/internals/StandbyTask.java        |  2 +-
 .../processor/internals/StateDirectory.java     |  6 +-
 .../internals/StoreChangelogReader.java         |  8 +-
 .../internals/StreamPartitionAssignor.java      |  4 +-
 .../streams/processor/internals/StreamTask.java | 24 ++++--
 .../processor/internals/StreamThread.java       | 87 ++++++++++----------
 .../streams/state/internals/NamedCache.java     | 13 +--
 .../streams/state/internals/ThreadCache.java    | 16 ++--
 11 files changed, 93 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d97f8f9..02129ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -168,11 +168,15 @@ public abstract class AbstractTask {
     }
 
     protected void updateOffsetLimits() {
-        log.debug("{} Updating store offset limits {}", logPrefix);
         for (final TopicPartition partition : partitions) {
             try {
                 final OffsetAndMetadata metadata = consumer.committed(partition); // TODO:
batch API?
-                stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() :
0L);
+                final long offset = metadata != null ? metadata.offset() : 0L;
+                stateMgr.putOffsetLimit(partition, offset);
+
+                if (log.isTraceEnabled()) {
+                    log.trace("{} Updating store offset limits {} for changelog {}", logPrefix,
offset, partition);
+                }
             } catch (final AuthorizationException e) {
                 throw new ProcessorStateException(String.format("task [%s] AuthorizationException
when initializing offsets for %s", id, partition), e);
             } catch (final WakeupException e) {
@@ -191,13 +195,13 @@ public abstract class AbstractTask {
     }
 
     void initializeStateStores() {
-        log.debug("{} Initializing state stores", logPrefix);
+        log.trace("{} Initializing state stores", logPrefix);
 
         // set initial offset limits
         updateOffsetLimits();
 
         for (final StateStore store : topology.stateStores()) {
-            log.trace("task [{}] Initializing store {}", id(), store.name());
+            log.trace("{} Initializing store {}", logPrefix, store.name());
             store.init(processorContext, store);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index dbda062..8112614 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -165,7 +165,6 @@ public class ProcessorNode<K, V> {
 
     protected static final class NodeMetrics  {
         final StreamsMetricsImpl metrics;
-        final String metricGrpName;
         final Map<String, String> metricTags;
 
         final Sensor nodeProcessTimeSensor;
@@ -180,7 +179,6 @@ public class ProcessorNode<K, V> {
             final String tagKey = "processor-node-id";
             final String tagValue = name;
             this.metrics = (StreamsMetricsImpl) metrics;
-            this.metricGrpName = "stream-processor-node-metrics";
             this.metricTags = new LinkedHashMap<>();
             this.metricTags.put(tagKey, tagValue);
 
@@ -190,7 +188,6 @@ public class ProcessorNode<K, V> {
             this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix
+ "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
             this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix
+ "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
             this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix
+ "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-
         }
 
         public void removeAllSensors() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1a89fab..226d7eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -119,7 +119,7 @@ public class ProcessorStateManager implements StateManager {
             checkpoint = null;
         }
 
-        log.info("{} Created state store manager for task {} with the acquired state dir
lock", logPrefix, taskId);
+        log.debug("{} Created state store manager for task {} with the acquired state dir
lock", logPrefix, taskId);
     }
 
 
@@ -349,7 +349,7 @@ public class ProcessorStateManager implements StateManager {
     }
 
     void registerGlobalStateStores(final List<StateStore> stateStores) {
-        log.info("{} Register global stores {}", logPrefix, stateStores);
+        log.debug("{} Register global stores {}", logPrefix, stateStores);
         for (final StateStore stateStore : stateStores) {
             globalStores.put(stateStore.name(), stateStore);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 5998c2b..8d518ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -140,7 +140,7 @@ public class StandbyTask extends AbstractTask {
      */
     public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
                                                        final List<ConsumerRecord<byte[],
byte[]>> records) {
-        log.debug("{} Updating standby replicas of its state store for partition [{}]", logPrefix,
partition);
+        log.trace("{} Updating standby replicas of its state store for partition [{}]", logPrefix,
partition);
         return stateMgr.updateStandbyStates(partition, records);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 78e3b9c..3e547eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -216,8 +216,10 @@ public class StateDirectory {
             if (!locks.containsKey(id)) {
                 try {
                     if (lock(id, 0)) {
-                        if (time.milliseconds() > taskDir.lastModified() + cleanupDelayMs)
{
-                            log.info("{} Deleting obsolete state directory {} for task {}
as cleanup delay of {} ms has passed", logPrefix, dirName, id, cleanupDelayMs);
+                        long now = time.milliseconds();
+                        long lastModifiedMs = taskDir.lastModified();
+                        if (now > lastModifiedMs + cleanupDelayMs) {
+                            log.info("{} Deleting obsolete state directory {} for task {}
as {}ms has elapsed (cleanup delay is {}ms)", logPrefix, dirName, id, now - lastModifiedMs,
cleanupDelayMs);
                             Utils.delete(taskDir);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 7a683f2..3d5a793 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -87,7 +87,7 @@ public class StoreChangelogReader implements ChangelogReader {
             throw new StreamsException(String.format("Store %s's change log (%s) does not
contain partition %s",
                                                      storeName, topicPartition.topic(), topicPartition.partition()));
         }
-        log.debug("{} Took {} ms to validate that partition {} exists", logPrefix, time.milliseconds()
- start, topicPartition);
+        log.debug("{} Took {}ms to validate that partition {} exists", logPrefix, time.milliseconds()
- start, topicPartition);
     }
 
     @Override
@@ -99,6 +99,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     public void restore() {
         final long start = time.milliseconds();
+        final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
         try {
             if (!consumer.subscription().isEmpty()) {
                 throw new IllegalStateException(String.format("Restore consumer should have
not subscribed to any partitions (%s) beforehand", consumer.subscription()));
@@ -106,7 +107,6 @@ public class StoreChangelogReader implements ChangelogReader {
             final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(stateRestorers.keySet());
 
             // remove any partitions where we already have all of the data
-            final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
             for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet())
{
                 TopicPartition topicPartition = entry.getKey();
                 Long offset = entry.getValue();
@@ -118,7 +118,7 @@ public class StoreChangelogReader implements ChangelogReader {
                 }
             }
 
-            log.info("{} Starting restoring state stores from changelog topics {}", logPrefix,
needsRestoring.keySet());
+            log.debug("{} Starting restoring state stores from changelog topics {}", logPrefix,
needsRestoring.keySet());
 
             consumer.assign(needsRestoring.keySet());
             final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
@@ -153,7 +153,7 @@ public class StoreChangelogReader implements ChangelogReader {
             }
         } finally {
             consumer.assign(Collections.<TopicPartition>emptyList());
-            log.debug("{} Took {} ms to restore all active states", logPrefix, time.milliseconds()
- start);
+            log.info("{} Completed restore all active states from changelog topics {} in
{}ms ", logPrefix, needsRestoring.keySet(), time.milliseconds() - start);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 9d5d4cc..0a1b2ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -313,7 +313,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             clientMetadata.addConsumer(consumerId, info);
         }
 
-        log.info("stream-thread [{}] Constructed client metadata {} from the member subscriptions.",
streamThread.getName(), clientsMetadata);
+        log.debug("stream-thread [{}] Constructed client metadata {} from the member subscriptions.",
streamThread.getName(), clientsMetadata);
 
         // ---------------- Step Zero ---------------- //
 
@@ -669,7 +669,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
-        log.info("stream-thread [{}] Completed validating internal topics in partition assignor",
streamThread.getName());
+        log.debug("stream-thread [{}] Completed validating internal topics in partition assignor",
streamThread.getName());
     }
 
     private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final
Map<InternalTopicConfig, Integer> topicsToMakeReady) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8355638..55e1ffe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -185,6 +185,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             final TopicPartition partition = recordInfo.partition();
 
             log.trace("{} Start processing one record [{}]", logPrefix, record);
+
             updateProcessorContext(record, currNode);
             currNode.process(record.key(), record.value());
 
@@ -225,7 +226,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
         updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
 
-        log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(),
timestamp);
+        if (log.isTraceEnabled()) {
+            log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(),
timestamp);
+        }
 
         try {
             node.punctuate(timestamp);
@@ -251,11 +254,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
     @Override
     public void commit() {
         commitImpl(true);
+
     }
 
     // visible for testing
     void commitImpl(final boolean startNewTransaction) {
-        log.trace("{} Committing", logPrefix);
+        log.debug("{} Committing", logPrefix);
         metrics.metrics.measureLatencyNs(
             time,
             new Runnable() {
@@ -269,6 +273,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 }
             },
             metrics.taskCommitTimeSensor);
+
+        commitRequested = false;
     }
 
     @Override
@@ -285,7 +291,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private void commitOffsets(final boolean startNewTransaction) {
         if (commitOffsetNeeded) {
-            log.debug("{} Committing offsets", logPrefix);
+            log.trace("{} Committing offsets", logPrefix);
             final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata
= new HashMap<>(consumedOffsets.size());
             for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
                 final TopicPartition partition = entry.getKey();
@@ -315,13 +321,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
             producer.commitTransaction();
             transactionInFlight = false;
         }
-
-        commitRequested = false;
     }
 
     private void initTopology() {
         // initialize the task by initializing all its processor nodes in the topology
-        log.debug("{} Initializing processor nodes of the topology", logPrefix);
+        log.trace("{} Initializing processor nodes of the topology", logPrefix);
         for (final ProcessorNode node : topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
@@ -343,6 +347,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     @Override
     public void suspend() {
+        log.debug("{} Suspending", logPrefix);
         suspend(true);
     }
 
@@ -356,7 +361,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * </pre>
      */
     private void suspend(final boolean clean) {
-        log.debug("{} Suspending", logPrefix);
         closeTopology(); // should we call this only on clean suspend?
         if (clean) {
             commitImpl(false);
@@ -364,7 +368,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
     }
 
     private void closeTopology() {
-        log.debug("{} Closing processor topology", logPrefix);
+        log.trace("{} Closing processor topology", logPrefix);
 
         partitionGroup.clear();
 
@@ -463,7 +467,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         final int oldQueueSize = partitionGroup.numBuffered(partition);
         final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
-        log.trace("{} Added records into the buffered queue of partition {}, new queue size
is {}", logPrefix, partition, newQueueSize);
+        if (log.isTraceEnabled()) {
+            log.trace("{} Added records into the buffered queue of partition {}, new queue
size is {}", logPrefix, partition, newQueueSize);
+        }
 
         // if after adding these records, its partition queue's buffered size has been
         // increased beyond the threshold, we can then pause the consumption for this partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 46697c1..3fd7832 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -157,7 +157,7 @@ public class StreamThread extends Thread {
 
         @Override
         public void onPartitionsAssigned(final Collection<TopicPartition> assignment)
{
-            log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n"
+
+            log.debug("{} at state {}: new partitions {} assigned at the end of consumer
rebalance.\n" +
                     "\tassigned active tasks: {}\n" +
                     "\tassigned standby tasks: {}\n" +
                     "\tcurrent suspended active tasks: {}\n" +
@@ -192,17 +192,19 @@ public class StreamThread extends Thread {
             } finally {
                 log.info("{} partition assignment took {} ms.\n" +
                         "\tcurrent active tasks: {}\n" +
-                        "\tcurrent standby tasks: {}",
+                        "\tcurrent standby tasks: {}\n" +
+                        "\tprevious active tasks: {}\n",
                     logPrefix,
                     time.milliseconds() - start,
                     activeTasks.keySet(),
-                    standbyTasks.keySet());
+                    standbyTasks.keySet(),
+                    prevActiveTasks);
             }
         }
 
         @Override
         public void onPartitionsRevoked(final Collection<TopicPartition> assignment)
{
-            log.info("{} at state {}: partitions {} revoked at the beginning of consumer
rebalance.\n" +
+            log.debug("{} at state {}: partitions {} revoked at the beginning of consumer
rebalance.\n" +
                     "\tcurrent assigned active tasks: {}\n" +
                     "\tcurrent assigned standby tasks: {}\n",
                 logPrefix,
@@ -226,13 +228,11 @@ public class StreamThread extends Thread {
 
                 log.info("{} partition revocation took {} ms.\n" +
                         "\tsuspended active tasks: {}\n" +
-                        "\tsuspended standby tasks: {}\n" +
-                        "\tprevious active tasks: {}\n",
+                        "\tsuspended standby tasks: {}",
                     logPrefix,
                     time.milliseconds() - start,
                     suspendedTasks.keySet(),
-                    suspendedStandbyTasks.keySet(),
-                    prevActiveTasks);
+                    suspendedStandbyTasks.keySet());
             }
         }
     }
@@ -242,7 +242,6 @@ public class StreamThread extends Thread {
         void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated,
final long start) {
             long backoffTimeMs = 50L;
             final Set<TaskId> retryingTasks = new HashSet<>();
-            long nextLoggingTime = System.currentTimeMillis() + 10000;
             while (true) {
                 final Iterator<Map.Entry<TaskId, Set<TopicPartition>>>
it = tasksToBeCreated.entrySet().iterator();
                 while (it.hasNext()) {
@@ -254,9 +253,7 @@ public class StreamThread extends Thread {
                         createTask(taskId, partitions);
                         it.remove();
                         backoffTimeMs = 50L;
-                        if (retryingTasks.remove(taskId) && log.isWarnEnabled())
{
-                            log.info("{} Created task {}", logPrefix, taskId);
-                        }
+                        retryingTasks.remove(taskId);
                     } catch (final LockException e) {
                         // ignore and retry
                         if (!retryingTasks.contains(taskId)) {
@@ -277,11 +274,6 @@ public class StreamThread extends Thread {
                 } catch (final InterruptedException e) {
                     // ignore
                 }
-
-                if (System.currentTimeMillis() > nextLoggingTime) {
-                    nextLoggingTime += 10000;
-                    log.warn("{} Still retrying to create tasks: {}", logPrefix, retryingTasks);
-                }
             }
         }
 
@@ -493,7 +485,7 @@ public class StreamThread extends Thread {
         suspendedTasks = new HashMap<>();
         suspendedStandbyTasks = new HashMap<>();
 
-        // standby ktables
+        // standby KTables
         standbyRecords = new HashMap<>();
 
         stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
time);
@@ -565,7 +557,7 @@ public class StreamThread extends Thread {
             }
 
             maybeCommit(timerStartedMs);
-            maybeUpdateStandbyTasks();
+            maybeUpdateStandbyTasks(timerStartedMs);
             maybeClean(timerStartedMs);
         }
         log.info("{} Shutting down at user request", logPrefix);
@@ -710,7 +702,15 @@ public class StreamThread extends Thread {
                 maybePunctuate(task);
                 if (task.commitNeeded()) {
                     name = "commit";
+
+                    long beforeCommitMs = time.milliseconds();
+
                     commitOne(task);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("{} Committed active task {} per user request in {}ms",
+                                logPrefix, task.id(), timerStartedMs - beforeCommitMs);
+                    }
                 }
             }
         });
@@ -758,7 +758,7 @@ public class StreamThread extends Thread {
         } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency
> 0) {
             // push up
             recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
-            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting
up recordsProcessedBeforeCommit={}",
+            log.debug("{} processing latency {} < commit time {} for {} records. Adjusting
up recordsProcessedBeforeCommit={}",
                 logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
         }
 
@@ -769,13 +769,19 @@ public class StreamThread extends Thread {
      * Commit all tasks owned by this thread if specified interval time has elapsed
      */
     protected void maybeCommit(final long now) {
-
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
-
-            log.debug("{} Committing all active tasks {} and standby tasks {} because the
commit interval {}ms has elapsed by {}ms",
-                logPrefix, activeTasks.keySet(), standbyTasks.keySet(), commitTimeMs, now
- lastCommitMs);
+            if (log.isTraceEnabled()) {
+                log.trace("{} Committing all active tasks {} and standby tasks {} since {}ms
has elapsed (commit interval is {}ms)",
+                        logPrefix, activeTasks.keySet(), standbyTasks.keySet(), now - lastCommitMs,
commitTimeMs);
+            }
 
             commitAll();
+
+            if (log.isDebugEnabled()) {
+                log.info("{} Committed all active tasks {} and standby tasks {} in {}ms",
+                        logPrefix, activeTasks.keySet(), standbyTasks.keySet(), timerStartedMs
- now);
+            }
+
             lastCommitMs = now;
 
             processStandbyRecords = true;
@@ -810,7 +816,6 @@ public class StreamThread extends Thread {
      * Commit the state of a task
      */
     private void commitOne(final AbstractTask task) {
-        log.trace("{} Committing {} {}", logPrefix, task.getClass().getSimpleName(), task.id());
         try {
             task.commit();
         } catch (final CommitFailedException e) {
@@ -825,7 +830,7 @@ public class StreamThread extends Thread {
         streamsMetrics.commitTimeSensor.record(computeLatency(), timerStartedMs);
     }
 
-    private void maybeUpdateStandbyTasks() {
+    private void maybeUpdateStandbyTasks(final long now) {
         if (!standbyTasks.isEmpty()) {
             if (processStandbyRecords) {
                 if (!standbyRecords.isEmpty()) {
@@ -846,6 +851,8 @@ public class StreamThread extends Thread {
                     }
 
                     standbyRecords = remainingStandbyRecords;
+
+                    log.debug("{} Updated standby tasks {} in {}ms", logPrefix, standbyTasks.keySet(),
time.milliseconds() - now);
                 }
                 processStandbyRecords = false;
             }
@@ -1076,8 +1083,7 @@ public class StreamThread extends Thread {
 
     @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState(final boolean cleanRun) {
-        log.debug("{} shutdownTasksAndState: shutting down" +
-                "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby
tasks {}",
+        log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks
{}, and suspended standby tasks {}",
             logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
             suspendedTasks.keySet(), suspendedStandbyTasks.keySet());
 
@@ -1102,7 +1108,7 @@ public class StreamThread extends Thread {
      * soon the tasks will be assigned again
      */
     private void suspendTasksAndState()  {
-        log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks
{}",
+        log.debug("{} Suspending all active tasks {} and standby tasks {}",
             logPrefix, activeTasks.keySet(), standbyTasks.keySet());
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
@@ -1226,8 +1232,6 @@ public class StreamThread extends Thread {
     }
 
     protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition>
partitions) {
-        log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id,
partitions);
-
         streamsMetrics.taskCreatedSensor.record();
 
         try {
@@ -1245,7 +1249,7 @@ public class StreamThread extends Thread {
                 time,
                 createProducer(id));
         } finally {
-            log.info("{} Created active task {} with assigned partitions {}", logPrefix,
id, partitions);
+            log.trace("{} Created active task {} with assigned partitions {}", logPrefix,
id, partitions);
         }
     }
 
@@ -1277,7 +1281,7 @@ public class StreamThread extends Thread {
         final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
 
         // collect newly assigned tasks and reopen re-assigned tasks
-        log.info("{} Adding assigned tasks as active {}", logPrefix, partitionAssignor.activeTasks());
+        log.debug("{} Adding assigned tasks as active: {}", logPrefix, partitionAssignor.activeTasks());
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet())
{
             final TaskId taskId = entry.getKey();
             final Set<TopicPartition> partitions = entry.getValue();
@@ -1308,14 +1312,12 @@ public class StreamThread extends Thread {
 
         // create all newly assigned tasks (guard against race condition with other thread
via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
-        log.debug("{} New active tasks to be created: {}", logPrefix, newTasks);
+        log.trace("{} New active tasks to be created: {}", logPrefix, newTasks);
 
         taskCreator.retryWithBackoff(newTasks, start);
     }
 
     private StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition>
partitions) {
-        log.debug("{} Creating new standby task {} with assigned partitions {}", logPrefix,
id, partitions);
-
         streamsMetrics.taskCreatedSensor.record();
 
         final ProcessorTopology topology = builder.build(id.topicGroupId);
@@ -1324,10 +1326,10 @@ public class StreamThread extends Thread {
             try {
                 return new StandbyTask(id, applicationId, partitions, topology, consumer,
storeChangelogReader, config, streamsMetrics, stateDirectory);
             } finally {
-                log.info("{} Created standby task {} with assigned partitions {}", logPrefix,
id, partitions);
+                log.trace("{} Created standby task {} with assigned partitions {}", logPrefix,
id, partitions);
             }
         } else {
-            log.info("{} Skipped standby task {} with assigned partitions {} since it does
not have any state stores to materialize", logPrefix, id, partitions);
+            log.trace("{} Skipped standby task {} with assigned partitions {} since it does
not have any state stores to materialize", logPrefix, id, partitions);
 
             return null;
         }
@@ -1342,7 +1344,7 @@ public class StreamThread extends Thread {
 
         final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>();
 
-        log.info("{} Adding assigned standby tasks {}", logPrefix, partitionAssignor.standbyTasks());
+        log.debug("{} Adding assigned standby tasks {}", logPrefix, partitionAssignor.standbyTasks());
         // collect newly assigned standby tasks and reopen re-assigned standby tasks
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet())
{
             final TaskId taskId = entry.getKey();
@@ -1361,7 +1363,7 @@ public class StreamThread extends Thread {
 
         // create all newly assigned standby tasks (guard against race condition with other
thread via backoff and retry)
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
-        log.debug("{} New standby tasks to be created: {}", logPrefix, newStandbyTasks);
+        log.trace("{} New standby tasks to be created: {}", logPrefix, newStandbyTasks);
 
         new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
 
@@ -1397,14 +1399,13 @@ public class StreamThread extends Thread {
     }
 
     private void updateSuspendedTasks() {
-        log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, activeTasks.keySet());
         suspendedTasks.clear();
         suspendedTasks.putAll(activeTasks);
         suspendedStandbyTasks.putAll(standbyTasks);
     }
 
     private void removeStreamTasks() {
-        log.info("{} Removing all active tasks {}", logPrefix, activeTasks.keySet());
+        log.debug("{} Removing all active tasks {}", logPrefix, activeTasks.keySet());
 
         try {
             prevActiveTasks.clear();
@@ -1418,7 +1419,7 @@ public class StreamThread extends Thread {
     }
 
     private void removeStandbyTasks() {
-        log.info("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet());
+        log.debug("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet());
 
         standbyTasks.clear();
         standbyTasksByPartition.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 6ea32a9..c86d9df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -47,9 +47,6 @@ class NamedCache {
     private LRUNode head;
     private long currentSizeBytes;
     private NamedCacheMetrics namedCacheMetrics;
-    // JMX stats
-    private Sensor hitRatio = null;
-
 
     // internal stats
     private long numReadHits = 0;
@@ -148,9 +145,6 @@ class NamedCache {
         }
     }
 
-
-
-
     synchronized void put(final Bytes key, final LRUCacheEntry value) {
         if (!value.isDirty() && dirtyKeys.contains(key)) {
             throw new IllegalStateException(String.format("Attempting to put a clean entry
for key [%s] " +
@@ -378,15 +372,14 @@ class NamedCache {
             this.metricTags = new LinkedHashMap<>();
             this.metricTags.put(tagKey, tagValue);
 
-
             hitRatioSensor = this.metrics.registry().sensor(entityName + "-" + opName, Sensor.RecordingLevel.DEBUG);
 
             hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName
+ "-avg", groupName,
-                "The current count of " + entityName + " " + opName + " operation.", metricTags),
new Avg());
+                "The average cache hit ratio of " + entityName, metricTags), new Avg());
             hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName
+ "-min", groupName,
-                "The current count of " + entityName + " " + opName + " operation.", metricTags),
new Min());
+                "The minimum cache hit ratio of " + entityName, metricTags), new Min());
             hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName
+ "-max", groupName,
-                "The current count of " + entityName + " " + opName + " operation.", metricTags),
new Max());
+                "The maximum cache hit ratio of " + entityName, metricTags), new Max());
 
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/59b57212/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 45a6488..867473f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -39,7 +39,7 @@ import java.util.NoSuchElementException;
 public class ThreadCache {
     private static final Logger log = LoggerFactory.getLogger(ThreadCache.class);
 
-    private final String name;
+    private final String logPrefix;
     private final long maxCacheSizeBytes;
     private final StreamsMetrics metrics;
     private final Map<String, NamedCache> caches = new HashMap<>();
@@ -54,8 +54,8 @@ public class ThreadCache {
         void apply(final List<DirtyEntry> dirty);
     }
 
-    public ThreadCache(final String name, long maxCacheSizeBytes, final StreamsMetrics metrics)
{
-        this.name = name;
+    public ThreadCache(final String logPrefix, long maxCacheSizeBytes, final StreamsMetrics
metrics) {
+        this.logPrefix = logPrefix;
         this.maxCacheSizeBytes = maxCacheSizeBytes;
         this.metrics = metrics;
     }
@@ -96,8 +96,9 @@ public class ThreadCache {
         }
         cache.flush();
 
-        log.trace("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
-                  name, puts(), gets(), evicts(), flushes());
+        if (log.isTraceEnabled())
+            log.trace("{} Cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
+                    logPrefix, puts(), gets(), evicts(), flushes());
     }
 
     public LRUCacheEntry get(final String namespace, Bytes key) {
@@ -201,6 +202,7 @@ public class ThreadCache {
     }
 
     private void maybeEvict(final String namespace) {
+        int numEvicted = 0;
         while (sizeBytes() > maxCacheSizeBytes) {
             final NamedCache cache = getOrCreateCache(namespace);
             // we abort here as the put on this cache may have triggered
@@ -210,10 +212,12 @@ public class ThreadCache {
             if (cache.size() == 0) {
                 return;
             }
-            log.trace("Thread {} evicting cache {}", name, namespace);
             cache.evict();
             numEvicts++;
+            numEvicted++;
         }
+
+        log.debug("{} Evicted {} entries from cache {}", logPrefix, numEvicted, namespace);
     }
 
     private synchronized NamedCache getCache(final String namespace) {


Mime
View raw message