kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: fixes a few error logging formats
Date Wed, 28 Sep 2016 00:37:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 65b85e6a4 -> aef1c13a3


MINOR: fixes a few error logging formats

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Eno Thereska <eno.thereska@gmail.com>

Closes #1919 from guozhangwang/minor-error-message-fixes

(cherry picked from commit 0c4cc5a4466d0dfce01903c29d97af3e428597f3)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aef1c13a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aef1c13a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aef1c13a

Branch: refs/heads/0.10.1
Commit: aef1c13a32404a4534d72ba30c5d7652391c1488
Parents: 65b85e6
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Sep 27 17:37:12 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 27 17:37:22 2016 -0700

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        |  8 +++----
 .../processor/internals/RecordCollector.java    |  2 +-
 .../processor/internals/StandbyTask.java        |  4 ++--
 .../processor/internals/StateDirectory.java     |  2 +-
 .../processor/internals/StreamThread.java       | 22 +++++++++-----------
 5 files changed, 18 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aef1c13a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 51a50e9..aa2f365 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -121,11 +121,11 @@ public class ProcessorStateManager {
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback)
{
 
         if (store.name().equals(CHECKPOINT_FILE_NAME)) {
-            throw new IllegalArgumentException(String.format("task [%s]  Illegal store name:
%s", taskId, CHECKPOINT_FILE_NAME));
+            throw new IllegalArgumentException(String.format("task [%s] Illegal store name:
%s", taskId, CHECKPOINT_FILE_NAME));
         }
 
         if (this.stores.containsKey(store.name())) {
-            throw new IllegalArgumentException(String.format("task [%s]  Store %s has already
been registered.", taskId, store.name()));
+            throw new IllegalArgumentException(String.format("task [%s] Store %s has already
been registered.", taskId, store.name()));
         }
 
         if (loggingEnabled) {
@@ -160,7 +160,7 @@ public class ProcessorStateManager {
 
             List<PartitionInfo> partitionInfos = restoreConsumer.partitionsFor(topic);
             if (partitionInfos == null) {
-                throw new StreamsException(String.format("task [%s]  Could not find partition
info for topic: %s", taskId, topic));
+                throw new StreamsException(String.format("task [%s] Could not find partition
info for topic: %s", taskId, topic));
             }
             for (PartitionInfo partitionInfo : partitionInfos) {
                 if (partitionInfo.partition() == partition) {
@@ -171,7 +171,7 @@ public class ProcessorStateManager {
         } while (partitionNotFound && System.currentTimeMillis() < startTime +
waitTime);
 
         if (partitionNotFound) {
-            throw new StreamsException(String.format("task [%s]  Store %s's change log (%s)
does not contain partition %s", taskId, store.name(), topic, partition));
+            throw new StreamsException(String.format("task [%s] Store %s's change log (%s)
does not contain partition %s", taskId, store.name(), topic, partition));
         }
 
         if (isStandby) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aef1c13a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 3b53be7..fe88b95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -84,7 +84,7 @@ public class RecordCollector {
                     offsets.put(tp, metadata.offset());
                 } else {
                     String prefix = String.format("task [%s]", streamTaskId);
-                    log.error("{} Error sending record to topic {}", prefix, topic, exception);
+                    log.error(String.format("%s Error sending record to topic %s", prefix,
topic), exception);
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/kafka/blob/aef1c13a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 384a1a2..40c4d9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -86,12 +86,12 @@ public class StandbyTask extends AbstractTask {
      * @return a list of records not consumed
      */
     public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition,
List<ConsumerRecord<byte[], byte[]>> records) {
-        log.debug("task [{}] updates for partition [{}]", id(), partition);
+        log.debug("task [{}] Updates for partition [{}]", id(), partition);
         return stateMgr.updateStandbyStates(partition, records);
     }
 
     public void commit() {
-        log.debug("task [{}] flushing", id());
+        log.debug("task [{}] Flushing", id());
         stateMgr.flush(processorContext);
 
         // reinitialize offset limits

http://git-wip-us.apache.org/repos/asf/kafka/blob/aef1c13a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index d81a91c..02abdeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -155,7 +155,7 @@ public class StateDirectory {
 
     /**
      * List all of the task directories
-     * @return
+     * @return The list of all the existing local directories for stream tasks
      */
     public File[] listTaskDirectories() {
         return stateDir.listFiles(new FileFilter() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aef1c13a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cc424cc..9c3b07c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -229,7 +229,7 @@ public class StreamThread extends Thread {
      */
     @Override
     public void run() {
-        log.info("Starting stream thread [{}]", this.getName());
+        log.info("stream-thread [{}] Starting", this.getName());
 
         try {
             runLoop();
@@ -239,7 +239,7 @@ public class StreamThread extends Thread {
         } catch (Exception e) {
             // we have caught all Kafka related exceptions, and other runtime exceptions
             // should be due to user application errors
-            log.error("stream-thread [{}] Streams application error during processing: ",
this.getName(),  e);
+            log.error(String.format("stream-thread [%s] Streams application error during
processing: ", this.getName()),  e);
             throw e;
         } finally {
             shutdown();
@@ -258,7 +258,7 @@ public class StreamThread extends Thread {
     }
 
     private void shutdown() {
-        log.info("Shutting down stream-thread [{}]", this.getName());
+        log.info("stream-thread [{}] Shutting down", this.getName());
 
         // Exceptions should not prevent this call from going through all shutdown steps
         try {
@@ -322,7 +322,6 @@ public class StreamThread extends Thread {
             consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
         }
 
-
         while (stillRunning()) {
             this.timerStartedMs = time.milliseconds();
 
@@ -425,7 +424,6 @@ public class StreamThread extends Thread {
                     StandbyTask task = standbyTasksByPartition.get(partition);
 
                     if (task == null) {
-                        log.error("stream-thread [{}]  missing standby task for partition
{} ", this.getName(), partition);
                         throw new StreamsException(String.format("stream-thread [%s] missing
standby task for partition %s", this.getName(), partition));
                     }
 
@@ -456,7 +454,7 @@ public class StreamThread extends Thread {
                 sensors.punctuateTimeSensor.record(computeLatency());
 
         } catch (KafkaException e) {
-            log.error("stream-thread [{}] Failed to punctuate active task #{}", this.getName(),
task.id(), e);
+            log.error(String.format("stream-thread [%s] Failed to punctuate active task %s:
", this.getName(), task.id()), e);
             throw e;
         }
     }
@@ -509,10 +507,10 @@ public class StreamThread extends Thread {
             task.commit();
         } catch (CommitFailedException e) {
             // commit failed. Just log it.
-            log.warn("stream-thread [{}] Failed to commit {} #{}", this.getName(), task.getClass().getSimpleName(),
task.id(), e);
+            log.warn(String.format("stream-thread [%s] Failed to commit %s %s: ", this.getName(),
task.getClass().getSimpleName(), task.id()), e);
         } catch (KafkaException e) {
             // commit failed due to an unexpected exception. Log it and rethrow the exception.
-            log.error("stream-thread [{}] Failed to commit {} #{}", this.getName(), task.getClass().getSimpleName(),
task.id(), e);
+            log.error(String.format("stream-thread [%s] Failed to commit %s %s: ", this.getName(),
task.getClass().getSimpleName(), task.id()), e);
             throw e;
         }
 
@@ -596,7 +594,7 @@ public class StreamThread extends Thread {
                 for (TopicPartition partition : partitions)
                     activeTasksByPartition.put(partition, task);
             } catch (StreamsException e) {
-                log.error("stream-thread [{}] Failed to create an active task #{}", this.getName(),
taskId, e);
+                log.error(String.format("stream-thread [%s] Failed to create an active task
%s: ", this.getName(), taskId), e);
                 throw e;
             }
         }
@@ -614,7 +612,7 @@ public class StreamThread extends Thread {
             activeTasksByPartition.clear();
 
         } catch (Exception e) {
-            log.error("stream-thread [{}] Failed to remove stream tasks", this.getName(),
e);
+            log.error(String.format("stream-thread [%s] Failed to remove stream tasks: ",
this.getName()), e);
         }
     }
 
@@ -623,7 +621,7 @@ public class StreamThread extends Thread {
         try {
             task.close();
         } catch (StreamsException e) {
-            log.error("stream-thread [{}] Failed to close a {}  #{}", this.getName(), task.getClass().getSimpleName(),
task.id(), e);
+            log.error(String.format("stream-thread [%s] Failed to close a %s %s: ", this.getName(),
task.getClass().getSimpleName(), task.id()), e);
         }
         sensors.taskDestructionSensor.record();
     }
@@ -726,7 +724,7 @@ public class StreamThread extends Thread {
             restoreConsumer.assign(Collections.<TopicPartition>emptyList());
 
         } catch (Exception e) {
-            log.error("Failed to remove standby tasks in thread [{}]: ", this.getName(),
e);
+            log.error(String.format("stream-thread [%s] Failed to remove standby tasks: ",
this.getName()), e);
         }
     }
 


Mime
View raw message