kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1307; potential socket leak in new producer and clean up; reviewed by Jay Kreps, Guozhang Wang and Neha Narkhede
Date Tue, 18 Mar 2014 17:54:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 423d9d5af -> cc859dcca


kafka-1307; potential socket leak in new producer and clean up; reviewed by Jay Kreps, Guozhang
Wang and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc859dcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc859dcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc859dcc

Branch: refs/heads/trunk
Commit: cc859dcca791dc8e1e6eeabe8b88387a11241b24
Parents: 423d9d5
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Mar 18 10:51:50 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Mar 18 10:52:34 2014 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/producer/internals/BufferPool.java  | 3 ++-
 .../clients/producer/internals/ErrorLoggingCallback.java     | 8 +++++---
 .../apache/kafka/clients/producer/internals/Metadata.java    | 8 +++++---
 .../kafka/clients/producer/internals/RecordAccumulator.java  | 6 +++---
 .../java/org/apache/kafka/common/config/AbstractConfig.java  | 6 +++++-
 .../main/java/org/apache/kafka/common/config/ConfigDef.java  | 2 +-
 .../main/java/org/apache/kafka/common/network/Selector.java  | 5 ++++-
 .../java/org/apache/kafka/common/protocol/types/Struct.java  | 4 +++-
 .../java/org/apache/kafka/common/network/SelectorTest.java   | 2 +-
 .../src/main/scala/kafka/tools/newproducer/MirrorMaker.scala | 4 ++--
 perf/src/main/scala/kafka/perf/ProducerPerformance.scala     | 2 +-
 11 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 5bed607..b69866a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -65,7 +65,8 @@ public final class BufferPool {
     }
 
     /**
-     * Allocate a buffer of the given size
+     * Allocate a buffer of the given size. This method blocks if there is not enough memory
and the buffer pool
+     * is configured with blocking mode.
      * 
      * @param size The buffer size to allocate in bytes
      * @return The buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
index 368e8f3..678d1c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
@@ -20,11 +20,13 @@ import org.slf4j.LoggerFactory;
 
 public class ErrorLoggingCallback implements Callback {
     private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
+    private String topic;
     private byte[] key;
     private byte[] value;
     private boolean logAsString;
 
-    public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) {
+    public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString)
{
+        this.topic = topic;
         this.key = key;
         this.value = value;
         this.logAsString = logAsString;
@@ -36,8 +38,8 @@ public class ErrorLoggingCallback implements Callback {
                     logAsString ? new String(key) : key.length + " bytes";
             String valueString = (value == null) ? "null" :
                     logAsString ? new String(value) : value.length + " bytes";
-            log.error("Error when sending message with key: " + keyString + ", value: " +
valueString +
-                    " with error " + e.getMessage());
+            log.error("Error when sending message to topic {} with key: {}, value: {} with
error: {}",
+                      topic, keyString, valueString, e.getMessage());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index db6e3a1..33d62a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -79,6 +79,7 @@ public final class Metadata {
     public synchronized Cluster fetch(String topic, long maxWaitMs) {
         List<PartitionInfo> partitions = null;
         long begin = System.currentTimeMillis();
+        long remainingWaitMs = maxWaitMs;
         do {
             partitions = cluster.partitionsFor(topic);
             if (partitions == null) {
@@ -86,12 +87,13 @@ public final class Metadata {
                 forceUpdate = true;
                 try {
                     log.trace("Requesting metadata update for topic {}.", topic);
-                    wait(maxWaitMs);
+                    wait(remainingWaitMs);
                 } catch (InterruptedException e) { /* this is fine, just try again */
                 }
-                long ellapsed = System.currentTimeMillis() - begin;
-                if (ellapsed >= maxWaitMs)
+                long elapsed = System.currentTimeMillis() - begin;
+                if (elapsed >= maxWaitMs)
                     throw new TimeoutException("Failed to update metadata after " + maxWaitMs
+ " ms.");
+                remainingWaitMs = maxWaitMs - elapsed;
             } else {
                 return cluster;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 7a03f38..673b296 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -143,9 +143,9 @@ public final class RecordAccumulator {
         log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size,
tp.topic(), tp.partition());
         ByteBuffer buffer = free.allocate(size);
         synchronized (dq) {
-            RecordBatch first = dq.peekLast();
-            if (first != null) {
-                FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
+            RecordBatch last = dq.peekLast();
+            if (last != null) {
+                FutureRecordMetadata future = last.tryAppend(key, value, compression, callback);
                 if (future != null) {
                     // Somebody else found us a batch, return the one we waited for! Hopefully
this doesn't happen
                     // often...

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index c989e25..84a327e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -65,10 +65,14 @@ public class AbstractConfig {
         return (Integer) get(key);
     }
 
-    public Long getLong(String key) {
+    public long getLong(String key) {
         return (Long) get(key);
     }
 
+    public double getDouble(String key) {
+        return (Double) get(key);
+    }
+
     @SuppressWarnings("unchecked")
     public List<String> getList(String key) {
         return (List<String>) get(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 61257d1..67b349d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -181,7 +181,7 @@ public class ConfigDef {
                     else if (value instanceof String)
                         return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
                     else
-                        throw new ConfigException(name, value, "Expected a comma seperated
list.");
+                        throw new ConfigException(name, value, "Expected a comma separated
list.");
                 case CLASS:
                     if (value instanceof Class)
                         return (Class<?>) value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index f83189d..9839632 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -96,7 +96,7 @@ public class Selector implements Selectable {
      * @param sendBufferSize The send buffer for the new connection
      * @param receiveBufferSize The receive buffer for the new connection
      * @throws IllegalStateException if there is already a connection for that id
-     * @throws UnresolvedAddressException if DNS resolution fails on the hostname
+     * @throws IOException if DNS resolution fails on the hostname or if the broker is down
      */
     @Override
     public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
throws IOException {
@@ -111,6 +111,9 @@ public class Selector implements Selectable {
             channel.connect(address);
         } catch (UnresolvedAddressException e) {
             channel.close();
+            throw new IOException("Can't resolve address: " + address, e);
+        } catch (IOException e) {
+            channel.close();
             throw e;
         }
         SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index dc03fd0..8cecba5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -150,7 +150,9 @@ public class Struct {
     }
 
     /**
-     * Create a struct for the schema of a container type (struct or array)
+     * Create a struct for the schema of a container type (struct or array).
+     * Note that for array type, this method assumes that the type is an array of schema
and creates a struct
+     * of that schema. Arrays of other types can't be instantiated with this method.
      * 
      * @param field The field to create an instance of
      * @return The struct

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 865996c..90e2dcf 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -125,7 +125,7 @@ public class SelectorTest {
     /**
      * Sending a request to a node with a bad hostname should result in an exception during
connect
      */
-    @Test(expected = UnresolvedAddressException.class)
+    @Test(expected = IOException.class)
     public void testNoRouteToHost() throws Exception {
         selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE,
BUFFER_SIZE);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
index 6f90549..a969a22 100644
--- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
@@ -170,11 +170,11 @@ object MirrorMaker extends Logging {
         trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()),
producerId))
         val producer = producers(producerId)
         producer.send(producerRecord,
-                      new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(),
false))
+                      new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(),
producerRecord.value(), false))
       } else {
         val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size)
         producers(producerId).send(producerRecord,
-                                   new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(),
false))
+                                   new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(),
producerRecord.value(), false))
         trace("Sent message to producer " + producerId)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc859dcc/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index f12a45b..3df0d13 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -220,7 +220,7 @@ object ProducerPerformance extends Logging {
         this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get()
       } else {
         this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes),
-                           new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true
else false))
+                           new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode)
true else false))
       }
     }
 


Mime
View raw message