kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1302; cleanup logging in new producer; reviewed by Jay Kreps, Guozhang Wang and Neha Narkhede
Date Thu, 13 Mar 2014 21:26:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c124bbbb6 -> 84a3a9a3d


kafka-1302; cleanup logging in new producer; reviewed by Jay Kreps, Guozhang Wang and Neha
Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84a3a9a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84a3a9a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84a3a9a3

Branch: refs/heads/trunk
Commit: 84a3a9a3d9a5f1c4936dfb9d65b4bf0cd3b445d6
Parents: c124bbb
Author: Jun Rao <junrao@gmail.com>
Authored: Thu Mar 13 14:25:37 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Mar 13 14:25:37 2014 -0700

----------------------------------------------------------------------
 .../internals/ErrorLoggingCallback.java         | 43 ++++++++++++++++++++
 .../clients/producer/internals/Sender.java      | 11 ++---
 .../kafka/common/protocol/types/Struct.java     | 15 ++++++-
 .../kafka/common/requests/ProduceResponse.java  | 28 +++++++++++++
 .../kafka/tools/newproducer/MirrorMaker.scala   |  7 +++-
 core/src/main/scala/kafka/utils/Utils.scala     | 23 -----------
 .../scala/kafka/perf/ProducerPerformance.scala  |  5 ++-
 7 files changed, 98 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
new file mode 100644
index 0000000..368e8f3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ErrorLoggingCallback implements Callback {
+    private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
+    private byte[] key;
+    private byte[] value;
+    private boolean logAsString;
+
+    public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) {
+        this.key = key;
+        this.value = value;
+        this.logAsString = logAsString;
+    }
+
+    public void onCompletion(RecordMetadata metadata, Exception e) {
+        if (e != null) {
+            String keyString = (key == null) ? "null" :
+                    logAsString ? new String(key) : key.length + " bytes";
+            String valueString = (value == null) ? "null" :
+                    logAsString ? new String(value) : value.length + " bytes";
+            log.error("Error when sending message with key: " + keyString + ", value: " +
valueString +
+                    " with error " + e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7b5d144..565331d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -197,7 +197,7 @@ public class Sender implements Runnable {
         if (ready.size() > 0) {
             log.trace("Partitions with complete batches: {}", ready);
             log.trace("Partitions ready to initiate a request: {}", sendable);
-            log.trace("Created {} requests: {}", requests.size(), requests);
+            log.trace("Created {} produce requests: {}", requests.size(), requests);
         }
 
         for (int i = 0; i < requests.size(); i++) {
@@ -233,11 +233,11 @@ public class Sender implements Runnable {
 
         if (nodeStates.isConnected(node.id())) {
             Set<String> topics = metadata.topics();
-            log.debug("Sending metadata update request for topics {} to node {}", topics,
node.id());
             this.metadataFetchInProgress = true;
-            InFlightRequest request = metadataRequest(node.id(), topics);
-            sends.add(request.request);
-            this.inFlightRequests.add(request);
+            InFlightRequest metadataRequest = metadataRequest(node.id(), topics);
+            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
+            sends.add(metadataRequest.request);
+            this.inFlightRequests.add(metadataRequest);
         } else if (nodeStates.canConnect(node.id(), now)) {
             // we don't have a connection to this node right now, make one
             initiateConnect(node, now);
@@ -345,6 +345,7 @@ public class Sender implements Runnable {
             nodeStates.disconnected(node);
             log.debug("Node {} disconnected.", node);
             for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
+                log.trace("Cancelled request {} due to node {} being disconnected", request,
node);
                 ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
                 switch (requestKey) {
                     case PRODUCE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 73ad6cd..dc03fd0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -230,9 +230,20 @@ public class Struct {
         StringBuilder b = new StringBuilder();
         b.append('{');
         for (int i = 0; i < this.values.length; i++) {
-            b.append(this.schema.get(i).name);
+            Field f = this.schema.get(i);
+            b.append(f.name);
             b.append('=');
-            b.append(this.values[i]);
+            if (f.type() instanceof ArrayOf) {
+                Object[] arrayValue = (Object[]) this.values[i];
+                b.append('[');
+                for (int j = 0; j < arrayValue.length; j++) {
+                    b.append(arrayValue[j]);
+                    if (j < arrayValue.length - 1)
+                        b.append(',');
+                }
+                b.append(']');
+            } else
+                b.append(this.values[i]);
             if (i < this.values.length - 1)
                 b.append(',');
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 6ac2e53..6fa4a58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -29,6 +29,16 @@ public class ProduceResponse {
             this.errorCode = errorCode;
             this.baseOffset = baseOffset;
         }
+        @Override
+        public String toString() {
+            StringBuilder b = new StringBuilder();
+            b.append('{');
+            b.append("pid: " + partitionId);
+            b.append(",error: " + errorCode);
+            b.append(",offset: " + baseOffset);
+            b.append('}');
+            return b.toString();
+        }
     }
 
     private final Map<String, Map<TopicPartition, PartitionResponse>> responses;
@@ -54,4 +64,22 @@ public class ProduceResponse {
     public Map<String, Map<TopicPartition, PartitionResponse>> responses() {
         return this.responses;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder b = new StringBuilder();
+        b.append('{');
+        boolean isFirst = true;
+        for (Map<TopicPartition, PartitionResponse> response : responses.values())
{
+            for (Map.Entry<TopicPartition, PartitionResponse> entry : response.entrySet())
{
+                if (isFirst)
+                    isFirst = false;
+                else
+                    b.append(',');
+                b.append(entry.getKey() + " : " + entry.getValue());
+            }
+        }
+        b.append('}');
+        return b.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
index d1dc13b..6f90549 100644
--- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
@@ -24,6 +24,7 @@ import kafka.consumer._
 import collection.mutable.ListBuffer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
 import java.util.concurrent.atomic.AtomicInteger
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 
 
 object MirrorMaker extends Logging {
@@ -168,10 +169,12 @@ object MirrorMaker extends Logging {
         val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size
         trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()),
producerId))
         val producer = producers(producerId)
-        producer.send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value()))
+        producer.send(producerRecord,
+                      new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(),
false))
       } else {
         val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size)
-        producers(producerId).send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(),
producerRecord.value()))
+        producers(producerId).send(producerRecord,
+                                   new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(),
false))
         trace("Sent message to producer " + producerId)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 33e05f0..6bfbac1 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -25,12 +25,10 @@ import java.util.concurrent.locks.Lock
 import java.lang.management._
 import javax.management._
 import scala.collection._
-import mutable.ListBuffer
 import scala.collection.mutable
 import java.util.Properties
 import kafka.common.KafkaException
 import kafka.common.KafkaStorageException
-import org.apache.kafka.clients.producer.{RecordMetadata, Callback}
 
 
 /**
@@ -541,25 +539,4 @@ object Utils extends Logging {
       lock.unlock()
     }
   }
-
-  def errorLoggingCallback(key: Array[Byte], value: Array[Byte], logAsString: Boolean = false)
= {
-    new Callback() {
-      def onCompletion(metadata: RecordMetadata, e: Exception) {
-        if (e != null) {
-          val keyString = if (key == null)
-                            "null"
-                          else {
-                            if (logAsString) new String(key) else key.length + " bytes"
-                          }
-          val valueString = if (value == null)
-                            "null"
-                          else {
-                            if (logAsString) new String(value) else value.length + " bytes"
-                          }
-          error("Error when sending message with key: " + keyString + ", value: " + valueString
+
-                " with exception " + e.getMessage)
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a3a9a3/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index f061dba..f12a45b 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -26,9 +26,10 @@ import java.text.SimpleDateFormat
 import kafka.serializer._
 import java.util._
 import collection.immutable.List
-import kafka.utils.{ VerifiableProperties, Logging, Utils }
+import kafka.utils.{VerifiableProperties, Logging, Utils}
 import kafka.metrics.KafkaMetricsReporter
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 
 /**
  * Load test for the producer
@@ -219,7 +220,7 @@ object ProducerPerformance extends Logging {
         this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get()
       } else {
         this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes),
-                           Utils.errorLoggingCallback(null, bytes, if (config.seqIdMode)
true else false))
+                           new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true
else false))
       }
     }
 


Mime
View raw message